Skip to content

Instantly share code, notes, and snippets.

@raffecat
Last active October 8, 2024 05:09
Show Gist options
  • Save raffecat/58961e86abe0c044e24193de9fe06947 to your computer and use it in GitHub Desktop.
Save raffecat/58961e86abe0c044e24193de9fe06947 to your computer and use it in GitHub Desktop.
Database migration from DogeNet
// initial schema is version 1
const SQL_SCHEMA string = `
CREATE TABLE IF NOT EXISTS announce ...
`
const SQL_MIGRATION_v2 string = `
ALTER TABLE announce ADD COLUMN owner BLOB
`
var MIGRATIONS = []struct {
ver int
query string
}{
{2, SQL_MIGRATION_v2},
}
func (s *SQLiteStore) initSchema(ctx context.Context) error {
// doTxn is optional, this is what DogeNet does.
return s.doTxn("init schema", func(tx *sql.Tx) error {
// apply migrations
verRow := tx.QueryRow("SELECT version FROM migration LIMIT 1")
var version int
err := verRow.Scan(&version)
if err != nil {
// first-time database init.
// init schema (idempotent)
_, err := tx.Exec(SQL_SCHEMA)
if err != nil {
return dbErr(err, "creating database schema")
}
// set up version table (idempotent)
err = tx.QueryRow("SELECT version FROM migration LIMIT 1").Scan(&version)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
version = 1
_, err = tx.Exec("INSERT INTO migration (version) VALUES (?)", version)
if err != nil {
return dbErr(err, "updating version")
}
} else {
return dbErr(err, "querying version")
}
}
// set up config table (idempotent)
// this is application-specific.
var dayc int
err = tx.QueryRow("SELECT dayc FROM config LIMIT 1").Scan(&dayc)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
_, err = tx.Exec("INSERT INTO config (dayc,last) VALUES (1,?)", unixDayStamp())
if err != nil {
return dbErr(err, "inserting config row")
}
} else {
return dbErr(err, "querying config")
}
}
}
// apply all new migrations.
initVer := version
for _, m := range MIGRATIONS {
if version < m.ver {
_, err = tx.Exec(m.query)
if err != nil {
return dbErr(err, fmt.Sprintf("applying migration %v", m.ver))
}
version = m.ver
}
}
if version != initVer {
_, err = tx.Exec("UPDATE migration SET version=?", version)
if err != nil {
return dbErr(err, "updating version")
}
}
return nil
})
}
// The Store implementation below uses a Cancellable Context
// to allow clean shutdown, especially because doTxn retries
// transactions on database conflics (competing queries)
type SQLiteStore struct {
db *sql.DB
ctx context.Context
}
// NewSQLiteStore returns a spec.Store implementation that uses SQLite
func NewSQLiteStore(fileName string, ctx context.Context) (spec.Store, error) {
db, err := sql.Open("sqlite3", fileName)
store := &SQLiteStore{db: db, ctx: ctx}
if err != nil {
return store, dbErr(err, "opening database")
}
db.SetMaxOpenConns(1) // limit concurrent access to avoid "database locked"
err = store.initSchema(ctx)
return store, err
}
func (s SQLiteStore) doTxn(name string, work func(tx *sql.Tx) error) error {
limit := 120
for {
tx, err := s.db.Begin()
if err != nil {
if IsConflict(err) {
s.Sleep(250 * time.Millisecond)
limit--
if limit != 0 {
continue
}
}
return dbErr(err, "cannot begin transaction: "+name)
}
defer tx.Rollback()
err = work(tx)
if err != nil {
if IsConflict(err) {
s.Sleep(250 * time.Millisecond)
limit--
if limit != 0 {
continue
}
}
return err
}
err = tx.Commit()
if err != nil {
if IsConflict(err) {
s.Sleep(250 * time.Millisecond)
limit--
if limit != 0 {
continue
}
}
return dbErr(err, "cannot commit: "+name)
}
return nil
}
}
func (s SQLiteStore) Sleep(dur time.Duration) {
select {
case <-s.ctx.Done():
case <-time.After(dur):
}
}
func IsConflict(err error) bool {
if sqErr, isSq := err.(sqlite3.Error); isSq {
if sqErr.Code == sqlite3.ErrBusy || sqErr.Code == sqlite3.ErrLocked {
return true
}
}
return false
}
func dbErr(err error, where string) error {
if errors.Is(err, spec.ErrNotFound) {
return err // pass through
}
if sqErr, isSq := err.(sqlite3.Error); isSq {
if sqErr.Code == sqlite3.ErrConstraint {
// MUST detect 'AlreadyExists' to fulfil the Store contract!
// Constraint violation, e.g. a duplicate key.
return spec.ErrAlreadyExists
}
if sqErr.Code == sqlite3.ErrBusy || sqErr.Code == sqlite3.ErrLocked {
// SQLite has a single-writer policy, even in WAL (write-ahead) mode.
// SQLite will return BUSY if the database is locked by another connection.
// We treat this as a transient database conflict, and the caller should retry.
return spec.WrapErr(spec.ErrDBConflict, fmt.Sprintf("SQLiteStore: db-conflict: %s", where), err)
}
}
return spec.WrapErr(spec.ErrDBProblem, fmt.Sprintf("SQLiteStore: db-problem: %s", where), err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment