Last active
October 8, 2024 05:09
-
-
Save raffecat/58961e86abe0c044e24193de9fe06947 to your computer and use it in GitHub Desktop.
Database migration from DogeNet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// initial schema is version 1 | |
const SQL_SCHEMA string = ` | |
CREATE TABLE IF NOT EXISTS announce ... | |
` | |
const SQL_MIGRATION_v2 string = ` | |
ALTER TABLE announce ADD COLUMN owner BLOB | |
` | |
var MIGRATIONS = []struct { | |
ver int | |
query string | |
}{ | |
{2, SQL_MIGRATION_v2}, | |
} | |
func (s *SQLiteStore) initSchema(ctx context.Context) error { | |
// doTxn is optional, this is what DogeNet does. | |
return s.doTxn("init schema", func(tx *sql.Tx) error { | |
// apply migrations | |
verRow := tx.QueryRow("SELECT version FROM migration LIMIT 1") | |
var version int | |
err := verRow.Scan(&version) | |
if err != nil { | |
// first-time database init. | |
// init schema (idempotent) | |
_, err := tx.Exec(SQL_SCHEMA) | |
if err != nil { | |
return dbErr(err, "creating database schema") | |
} | |
// set up version table (idempotent) | |
err = tx.QueryRow("SELECT version FROM migration LIMIT 1").Scan(&version) | |
if err != nil { | |
if errors.Is(err, sql.ErrNoRows) { | |
version = 1 | |
_, err = tx.Exec("INSERT INTO migration (version) VALUES (?)", version) | |
if err != nil { | |
return dbErr(err, "updating version") | |
} | |
} else { | |
return dbErr(err, "querying version") | |
} | |
} | |
// set up config table (idempotent) | |
// this is application-specific. | |
var dayc int | |
err = tx.QueryRow("SELECT dayc FROM config LIMIT 1").Scan(&dayc) | |
if err != nil { | |
if errors.Is(err, sql.ErrNoRows) { | |
_, err = tx.Exec("INSERT INTO config (dayc,last) VALUES (1,?)", unixDayStamp()) | |
if err != nil { | |
return dbErr(err, "inserting config row") | |
} | |
} else { | |
return dbErr(err, "querying config") | |
} | |
} | |
} | |
// apply all new migrations. | |
initVer := version | |
for _, m := range MIGRATIONS { | |
if version < m.ver { | |
_, err = tx.Exec(m.query) | |
if err != nil { | |
return dbErr(err, fmt.Sprintf("applying migration %v", m.ver)) | |
} | |
version = m.ver | |
} | |
} | |
if version != initVer { | |
_, err = tx.Exec("UPDATE migration SET version=?", version) | |
if err != nil { | |
return dbErr(err, "updating version") | |
} | |
} | |
return nil | |
}) | |
} | |
// The Store implementation below uses a Cancellable Context | |
// to allow clean shutdown, especially because doTxn retries | |
// transactions on database conflics (competing queries) | |
type SQLiteStore struct { | |
db *sql.DB | |
ctx context.Context | |
} | |
// NewSQLiteStore returns a spec.Store implementation that uses SQLite | |
func NewSQLiteStore(fileName string, ctx context.Context) (spec.Store, error) { | |
db, err := sql.Open("sqlite3", fileName) | |
store := &SQLiteStore{db: db, ctx: ctx} | |
if err != nil { | |
return store, dbErr(err, "opening database") | |
} | |
db.SetMaxOpenConns(1) // limit concurrent access to avoid "database locked" | |
err = store.initSchema(ctx) | |
return store, err | |
} | |
func (s SQLiteStore) doTxn(name string, work func(tx *sql.Tx) error) error { | |
limit := 120 | |
for { | |
tx, err := s.db.Begin() | |
if err != nil { | |
if IsConflict(err) { | |
s.Sleep(250 * time.Millisecond) | |
limit-- | |
if limit != 0 { | |
continue | |
} | |
} | |
return dbErr(err, "cannot begin transaction: "+name) | |
} | |
defer tx.Rollback() | |
err = work(tx) | |
if err != nil { | |
if IsConflict(err) { | |
s.Sleep(250 * time.Millisecond) | |
limit-- | |
if limit != 0 { | |
continue | |
} | |
} | |
return err | |
} | |
err = tx.Commit() | |
if err != nil { | |
if IsConflict(err) { | |
s.Sleep(250 * time.Millisecond) | |
limit-- | |
if limit != 0 { | |
continue | |
} | |
} | |
return dbErr(err, "cannot commit: "+name) | |
} | |
return nil | |
} | |
} | |
func (s SQLiteStore) Sleep(dur time.Duration) { | |
select { | |
case <-s.ctx.Done(): | |
case <-time.After(dur): | |
} | |
} | |
func IsConflict(err error) bool { | |
if sqErr, isSq := err.(sqlite3.Error); isSq { | |
if sqErr.Code == sqlite3.ErrBusy || sqErr.Code == sqlite3.ErrLocked { | |
return true | |
} | |
} | |
return false | |
} | |
func dbErr(err error, where string) error { | |
if errors.Is(err, spec.ErrNotFound) { | |
return err // pass through | |
} | |
if sqErr, isSq := err.(sqlite3.Error); isSq { | |
if sqErr.Code == sqlite3.ErrConstraint { | |
// MUST detect 'AlreadyExists' to fulfil the Store contract! | |
// Constraint violation, e.g. a duplicate key. | |
return spec.ErrAlreadyExists | |
} | |
if sqErr.Code == sqlite3.ErrBusy || sqErr.Code == sqlite3.ErrLocked { | |
// SQLite has a single-writer policy, even in WAL (write-ahead) mode. | |
// SQLite will return BUSY if the database is locked by another connection. | |
// We treat this as a transient database conflict, and the caller should retry. | |
return spec.WrapErr(spec.ErrDBConflict, fmt.Sprintf("SQLiteStore: db-conflict: %s", where), err) | |
} | |
} | |
return spec.WrapErr(spec.ErrDBProblem, fmt.Sprintf("SQLiteStore: db-problem: %s", where), err) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment