Last active
June 15, 2023 20:37
-
-
Save corny/0c2a2372aaeac0e4a4bcba3e940ab114 to your computer and use it in GitHub Desktop.
SQLite journal in Go (golang)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Stores entries in a local SQLite database | |
until they have been processed by a submit function. | |
*/ | |
package journal | |
import ( | |
"database/sql" | |
_ "github.com/mattn/go-sqlite3" | |
"sync" | |
"time" | |
) | |
const batchSize = 100 | |
var ( | |
submitFunc SubmitFunc | |
db *sql.DB | |
stmtAdd *sql.Stmt | |
stmtSelect *sql.Stmt | |
stmtDelete *sql.Stmt | |
queue = make(chan *Entry, 100) // enqueued entries for the local database | |
cond = sync.Cond{L: &sync.Mutex{}} | |
enqueued uint | |
wg sync.WaitGroup // for Close() | |
stopped bool | |
) | |
// Function that submits local entries to the remote journal | |
// The local entries are deleted if the function returns true | |
type SubmitFunc func([]Entry) bool | |
type Entry struct { | |
Id int64 | |
Data []byte | |
} | |
func Start(dbpath string, f SubmitFunc) (err error) { | |
if db != nil { | |
panic("journal already started") | |
} | |
db, err = sql.Open("sqlite3", dbpath) | |
if err != nil { | |
panic("fehler") | |
return err | |
} | |
submitFunc = f | |
// Create table if not exists | |
_, err = db.Exec(` | |
CREATE TABLE IF NOT EXISTS entries( | |
Id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, | |
Data BLOB NOT NULL | |
)`) | |
if err != nil { | |
db.Close() | |
db = nil | |
panic(err) | |
return err | |
} | |
// CREATE statement | |
stmtAdd = prepareStmt(`INSERT INTO entries (Data) VALUES(?)`) | |
// SELECT statement | |
stmtSelect = prepareStmt(`SELECT Id, Data FROM entries ORDER BY Id LIMIT ?`) | |
// DELETE statement | |
stmtDelete = prepareStmt(`DELETE FROM entries WHERE id <= ?`) | |
// Start go routine that stores entries in the local journal | |
wg.Add(1) | |
go func() { | |
for entry := range queue { | |
entry.store() | |
} | |
wg.Done() | |
}() | |
// count stored entries | |
row := db.QueryRow("SELECT COUNT(*) from entries") | |
panicOnErr(row.Scan(&enqueued)) | |
// start worker | |
go submitWorker() | |
return nil | |
} | |
func Stop() { | |
// stop submit worker | |
stopped = true | |
cond.Signal() | |
// stop accepting new entries | |
close(queue) | |
wg.Wait() | |
// Remove database objects | |
stmtAdd.Close() | |
stmtSelect.Close() | |
stmtDelete.Close() | |
db.Close() | |
} | |
// Enqueues an entry | |
func Add(data []byte) { | |
queue <- &Entry{Data: data} | |
} | |
func submitWorker() { | |
for { | |
// Wait for entries or stop | |
cond.L.Lock() | |
if enqueued == 0 && !stopped { | |
cond.Wait() | |
} | |
cond.L.Unlock() | |
if stopped { | |
break | |
} | |
submitEntries() | |
} | |
} | |
func submitEntries() { | |
rows, err := stmtSelect.Query(batchSize) | |
panicOnErr(err) | |
defer rows.Close() | |
entries := make([]Entry, 0, batchSize) | |
for rows.Next() { | |
entry := Entry{} | |
err := rows.Scan(&entry.Id, &entry.Data) | |
if err != nil { | |
panic(err) | |
} | |
entries = append(entries, entry) | |
} | |
if len(entries) == 0 { | |
return | |
} | |
if !submitFunc(entries) { | |
time.Sleep(time.Second * 5) | |
return | |
} | |
// remove submitted items | |
_, err = stmtDelete.Exec(entries[len(entries)-1].Id) | |
panicOnErr(err) | |
// update counter | |
cond.L.Lock() | |
enqueued -= uint(len(entries)) | |
cond.L.Unlock() | |
} | |
// Stores an entry in the local database | |
func (entry *Entry) store() { | |
_, err := stmtAdd.Exec(entry.Data) | |
panicOnErr(err) | |
// notify worker | |
cond.L.Lock() | |
enqueued++ | |
cond.L.Unlock() | |
cond.Signal() | |
} | |
func panicOnErr(err error) { | |
if err != nil { | |
panic(err) | |
} | |
} | |
func prepareStmt(sql string) *sql.Stmt { | |
stmt, err := db.Prepare(sql) | |
panicOnErr(err) | |
return stmt | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package journal | |
import ( | |
"github.com/stretchr/testify/assert" | |
"io/ioutil" | |
"os" | |
"testing" | |
"time" | |
) | |
var testResult []string | |
func TestJournal(t *testing.T) { | |
assert := assert.New(t) | |
file, err := ioutil.TempFile(os.TempDir(), "journal") | |
assert.NoError(err) | |
defer os.Remove(file.Name()) | |
assert.NoError(Start(file.Name(), testSubmit)) | |
Add([]byte("foo")) | |
waitForResultCount(1) | |
Add([]byte("bar")) | |
waitForResultCount(2) | |
Stop() | |
assert.EqualValues([]string{"foo", "bar"}, testResult) | |
} | |
func waitForResultCount(expected int) { | |
for i := 0; len(testResult) != expected && i < 1000; i++ { | |
time.Sleep(time.Millisecond) | |
} | |
} | |
func testSubmit(entries []Entry) bool { | |
for _, entry := range entries { | |
testResult = append(testResult, string(entry.Data)) | |
} | |
return true | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment