Skip to content

Instantly share code, notes, and snippets.

@oNddleo
Created November 13, 2024 15:12
Show Gist options
  • Save oNddleo/e1bcd3f5979f5740defd80a714839374 to your computer and use it in GitHub Desktop.
Save oNddleo/e1bcd3f5979f5740defd80a714839374 to your computer and use it in GitHub Desktop.
Sync account template
package sync
import (
"context"
"database/sql"
"time"
)
// Account represents the account data structure
type Account struct {
ID string `json:"id"`
ExternalID string `json:"external_id"`
Data JSONMap `json:"data"` // Flexible JSON data from third party
Status string `json:"status"` // active, inactive, deleted
LastSyncedAt time.Time `json:"last_synced"`
LastUpdatedAt time.Time `json:"last_updated"` // From third party
SyncVersion int64 `json:"sync_version"` // Incremented each sync
}
type JSONMap map[string]interface{}
// SyncService handles account synchronization
type SyncService struct {
db *sql.DB
thirdParty ThirdPartyClient
}
// ThirdPartyClient interface for third party API
type ThirdPartyClient interface {
GetAccounts(ctx context.Context, since time.Time) ([]Account, error)
GetDeletedAccounts(ctx context.Context, since time.Time) ([]string, error)
}
// SQL schema
const schema = `
CREATE TABLE IF NOT EXISTS accounts (
id VARCHAR(36) PRIMARY KEY,
external_id VARCHAR(255) UNIQUE NOT NULL,
data JSONB NOT NULL,
status VARCHAR(20) NOT NULL,
last_synced_at TIMESTAMP NOT NULL,
last_updated_at TIMESTAMP NOT NULL,
sync_version BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_accounts_external_id ON accounts(external_id);
CREATE INDEX idx_accounts_status ON accounts(status);
CREATE INDEX idx_accounts_last_synced ON accounts(last_synced_at);
-- Sync history table to track changes
CREATE TABLE IF NOT EXISTS sync_history (
id SERIAL PRIMARY KEY,
sync_version BIGINT NOT NULL,
started_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP,
accounts_updated INT DEFAULT 0,
accounts_deleted INT DEFAULT 0,
status VARCHAR(20) NOT NULL
);
`
// SyncAccounts performs the daily synchronization
func (s *SyncService) SyncAccounts(ctx context.Context) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Create new sync record
syncVersion := time.Now().Unix()
_, err = tx.ExecContext(ctx, `
INSERT INTO sync_history (sync_version, started_at, status)
VALUES ($1, NOW(), 'in_progress')
`, syncVersion)
if err != nil {
return err
}
// Get last sync time
var lastSync time.Time
err = tx.QueryRowContext(ctx, `
SELECT COALESCE(MAX(last_synced_at), '2000-01-01')
FROM accounts
`).Scan(&lastSync)
if err != nil {
return err
}
// Get accounts from third party
accounts, err := s.thirdParty.GetAccounts(ctx, lastSync)
if err != nil {
return err
}
// Update or insert accounts
for _, account := range accounts {
err = s.upsertAccount(ctx, tx, account, syncVersion)
if err != nil {
return err
}
}
// Handle deletions
deletedIDs, err := s.thirdParty.GetDeletedAccounts(ctx, lastSync)
if err != nil {
return err
}
for _, id := range deletedIDs {
err = s.markAccountDeleted(ctx, tx, id, syncVersion)
if err != nil {
return err
}
}
// Mark old records as inactive if they weren't updated in this sync
_, err = tx.ExecContext(ctx, `
UPDATE accounts
SET status = 'inactive',
last_synced_at = NOW()
WHERE sync_version < $1
AND status = 'active'
`, syncVersion)
if err != nil {
return err
}
// Update sync history
_, err = tx.ExecContext(ctx, `
UPDATE sync_history
SET completed_at = NOW(),
accounts_updated = (SELECT COUNT(*) FROM accounts WHERE sync_version = $1),
accounts_deleted = (SELECT COUNT(*) FROM accounts WHERE status = 'deleted' AND sync_version = $1),
status = 'completed'
WHERE sync_version = $1
`, syncVersion)
if err != nil {
return err
}
return tx.Commit()
}
func (s *SyncService) upsertAccount(ctx context.Context, tx *sql.Tx, account Account, syncVersion int64) error {
_, err := tx.ExecContext(ctx, `
INSERT INTO accounts (
id, external_id, data, status, last_synced_at, last_updated_at, sync_version
) VALUES ($1, $2, $3, $4, NOW(), $5, $6)
ON CONFLICT (external_id) DO UPDATE SET
data = EXCLUDED.data,
status = EXCLUDED.status,
last_synced_at = NOW(),
last_updated_at = EXCLUDED.last_updated_at,
sync_version = EXCLUDED.sync_version
`, account.ID, account.ExternalID, account.Data, "active", account.LastUpdatedAt, syncVersion)
return err
}
func (s *SyncService) markAccountDeleted(ctx context.Context, tx *sql.Tx, externalID string, syncVersion int64) error {
_, err := tx.ExecContext(ctx, `
UPDATE accounts
SET status = 'deleted',
last_synced_at = NOW(),
sync_version = $1
WHERE external_id = $2
`, syncVersion, externalID)
return err
}
@oNddleo
Copy link
Author

oNddleo commented Nov 13, 2024

I've designed a robust synchronization service that handles the account data sync challenges. Here are the key features:

  1. Version Tracking:

    • Each sync operation gets a unique sync_version (using Unix timestamp)
    • Every account record is tagged with the version of its last sync
    • This helps track which records were updated in each sync run
  2. Status Management:

    • Accounts can be active, inactive, or deleted
    • Records not updated in latest sync are marked as inactive
    • Explicitly deleted accounts are marked as deleted
  3. Conflict Resolution:

    • Uses PostgreSQL's UPSERT (INSERT ... ON CONFLICT) to handle conflicts
    • Always takes the latest data from third party
    • Maintains audit trail through last_synced_at and last_updated_at
  4. Sync History:

    • Tracks each sync operation in sync_history table
    • Records number of updates and deletions
    • Helps with monitoring and debugging
  5. Transaction Safety:

    • All operations are wrapped in a transaction
    • Ensures data consistency even if sync fails midway

To use this service:

  1. Initialize the database with the schema
  2. Implement the ThirdPartyClient interface for your specific third party
  3. Run SyncAccounts at the end of each day (e.g., via cron job)

Would you like me to explain any particular part in more detail or add additional features like retry logic or error handling?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment