Created
November 13, 2024 15:12
-
-
Save oNddleo/e1bcd3f5979f5740defd80a714839374 to your computer and use it in GitHub Desktop.
Sync account template
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sync | |
import ( | |
"context" | |
"database/sql" | |
"time" | |
) | |
// Account represents the account data structure | |
type Account struct { | |
ID string `json:"id"` | |
ExternalID string `json:"external_id"` | |
Data JSONMap `json:"data"` // Flexible JSON data from third party | |
Status string `json:"status"` // active, inactive, deleted | |
LastSyncedAt time.Time `json:"last_synced"` | |
LastUpdatedAt time.Time `json:"last_updated"` // From third party | |
SyncVersion int64 `json:"sync_version"` // Incremented each sync | |
} | |
type JSONMap map[string]interface{} | |
// SyncService handles account synchronization | |
type SyncService struct { | |
db *sql.DB | |
thirdParty ThirdPartyClient | |
} | |
// ThirdPartyClient interface for third party API | |
type ThirdPartyClient interface { | |
GetAccounts(ctx context.Context, since time.Time) ([]Account, error) | |
GetDeletedAccounts(ctx context.Context, since time.Time) ([]string, error) | |
} | |
// SQL schema | |
const schema = ` | |
CREATE TABLE IF NOT EXISTS accounts ( | |
id VARCHAR(36) PRIMARY KEY, | |
external_id VARCHAR(255) UNIQUE NOT NULL, | |
data JSONB NOT NULL, | |
status VARCHAR(20) NOT NULL, | |
last_synced_at TIMESTAMP NOT NULL, | |
last_updated_at TIMESTAMP NOT NULL, | |
sync_version BIGINT NOT NULL, | |
created_at TIMESTAMP NOT NULL DEFAULT NOW() | |
); | |
CREATE INDEX idx_accounts_external_id ON accounts(external_id); | |
CREATE INDEX idx_accounts_status ON accounts(status); | |
CREATE INDEX idx_accounts_last_synced ON accounts(last_synced_at); | |
-- Sync history table to track changes | |
CREATE TABLE IF NOT EXISTS sync_history ( | |
id SERIAL PRIMARY KEY, | |
sync_version BIGINT NOT NULL, | |
started_at TIMESTAMP NOT NULL, | |
completed_at TIMESTAMP, | |
accounts_updated INT DEFAULT 0, | |
accounts_deleted INT DEFAULT 0, | |
status VARCHAR(20) NOT NULL | |
); | |
` | |
// SyncAccounts performs the daily synchronization | |
func (s *SyncService) SyncAccounts(ctx context.Context) error { | |
tx, err := s.db.BeginTx(ctx, nil) | |
if err != nil { | |
return err | |
} | |
defer tx.Rollback() | |
// Create new sync record | |
syncVersion := time.Now().Unix() | |
_, err = tx.ExecContext(ctx, ` | |
INSERT INTO sync_history (sync_version, started_at, status) | |
VALUES ($1, NOW(), 'in_progress') | |
`, syncVersion) | |
if err != nil { | |
return err | |
} | |
// Get last sync time | |
var lastSync time.Time | |
err = tx.QueryRowContext(ctx, ` | |
SELECT COALESCE(MAX(last_synced_at), '2000-01-01') | |
FROM accounts | |
`).Scan(&lastSync) | |
if err != nil { | |
return err | |
} | |
// Get accounts from third party | |
accounts, err := s.thirdParty.GetAccounts(ctx, lastSync) | |
if err != nil { | |
return err | |
} | |
// Update or insert accounts | |
for _, account := range accounts { | |
err = s.upsertAccount(ctx, tx, account, syncVersion) | |
if err != nil { | |
return err | |
} | |
} | |
// Handle deletions | |
deletedIDs, err := s.thirdParty.GetDeletedAccounts(ctx, lastSync) | |
if err != nil { | |
return err | |
} | |
for _, id := range deletedIDs { | |
err = s.markAccountDeleted(ctx, tx, id, syncVersion) | |
if err != nil { | |
return err | |
} | |
} | |
// Mark old records as inactive if they weren't updated in this sync | |
_, err = tx.ExecContext(ctx, ` | |
UPDATE accounts | |
SET status = 'inactive', | |
last_synced_at = NOW() | |
WHERE sync_version < $1 | |
AND status = 'active' | |
`, syncVersion) | |
if err != nil { | |
return err | |
} | |
// Update sync history | |
_, err = tx.ExecContext(ctx, ` | |
UPDATE sync_history | |
SET completed_at = NOW(), | |
accounts_updated = (SELECT COUNT(*) FROM accounts WHERE sync_version = $1), | |
accounts_deleted = (SELECT COUNT(*) FROM accounts WHERE status = 'deleted' AND sync_version = $1), | |
status = 'completed' | |
WHERE sync_version = $1 | |
`, syncVersion) | |
if err != nil { | |
return err | |
} | |
return tx.Commit() | |
} | |
func (s *SyncService) upsertAccount(ctx context.Context, tx *sql.Tx, account Account, syncVersion int64) error { | |
_, err := tx.ExecContext(ctx, ` | |
INSERT INTO accounts ( | |
id, external_id, data, status, last_synced_at, last_updated_at, sync_version | |
) VALUES ($1, $2, $3, $4, NOW(), $5, $6) | |
ON CONFLICT (external_id) DO UPDATE SET | |
data = EXCLUDED.data, | |
status = EXCLUDED.status, | |
last_synced_at = NOW(), | |
last_updated_at = EXCLUDED.last_updated_at, | |
sync_version = EXCLUDED.sync_version | |
`, account.ID, account.ExternalID, account.Data, "active", account.LastUpdatedAt, syncVersion) | |
return err | |
} | |
func (s *SyncService) markAccountDeleted(ctx context.Context, tx *sql.Tx, externalID string, syncVersion int64) error { | |
_, err := tx.ExecContext(ctx, ` | |
UPDATE accounts | |
SET status = 'deleted', | |
last_synced_at = NOW(), | |
sync_version = $1 | |
WHERE external_id = $2 | |
`, syncVersion, externalID) | |
return err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've designed a robust synchronization service that handles the account data sync challenges. Here are the key features:
Version Tracking:
sync_version
(using Unix timestamp)Status Management:
active
,inactive
, ordeleted
inactive
deleted
Conflict Resolution:
last_synced_at
andlast_updated_at
Sync History:
sync_history
tableTransaction Safety:
To use this service:
ThirdPartyClient
interface for your specific third partySyncAccounts
at the end of each day (e.g., via cron job)Would you like me to explain any particular part in more detail or add additional features like retry logic or error handling?