Skip to content

Instantly share code, notes, and snippets.

@bruth
Last active April 12, 2024 05:02
Show Gist options
  • Save bruth/bfd84fdfc855e79c9e79 to your computer and use it in GitHub Desktop.
Save bruth/bfd84fdfc855e79c9e79 to your computer and use it in GitHub Desktop.
SQLite update hook example in Go

SQLite Update Hook Example

This is an example usage of registering an update_hook to a SQLite connection. The motivation for exploring this feature is to test out various implementations of data monitoring interfaces.

A few notable properties of the implementation:

  • The hook must be registered on the connection being used which requires clients to manually integrate this code.
  • Each INSERT and UPDATE operation requires a subsequent SELECT to get the row data.
  • When registering the hook, increasing the bufsize under heavy workloads will improve throughput, but the SQLite library is single-threaded by design.
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"code.google.com/p/go-sqlite/go1/sqlite3"
)
// OpString returns a string representation of update operation.
func OpString(op int) string {
switch op {
case sqlite3.INSERT:
return "insert"
case sqlite3.UPDATE:
return "update"
case sqlite3.DELETE:
return "delete"
default:
return ""
}
}
// RowByID returns a row in a table for the specified row ID.
func RowByID(conn *sqlite3.Conn, tbl string, rid int64) (sqlite3.RowMap, error) {
sql := fmt.Sprintf("select * from %s where rowid = $rid", tbl)
stmt, err := conn.Query(sql, sqlite3.NamedArgs{"$rid": rid})
if err != nil {
return nil, err
}
// Get a row map back.
row := sqlite3.RowMap{}
if err = stmt.Scan(row); err != nil {
return nil, err
}
return row, nil
}
// RegisterUpdateHandler takes a connection and registers an update handler with
// it to emit records that are inserted, updated, or deleted.
func RegisterUpdateHandler(conn *sqlite3.Conn, bufsize int) chan map[string]interface{} {
var (
err error
pl map[string]interface{}
rm sqlite3.RowMap
ch = make(chan map[string]interface{}, bufsize)
)
// Note, this overrides the previously registered handler.
conn.UpdateFunc(func(op int, rdb, rtbl sqlite3.RawString, rid int64) {
db := rdb.Copy()
tbl := rtbl.Copy()
pl = map[string]interface{}{
"operation": OpString(op),
"database": db,
"table": tbl,
"row": rid,
"record": nil,
"error": nil,
}
// Get the record for insert and update operations.
if op != sqlite3.DELETE {
rm, err = RowByID(conn, tbl, rid)
if err != nil {
pl["error"] = err
} else {
pl["record"] = rm
}
}
ch <- pl
})
return ch
}
type Handler func(map[string]interface{}) error
// DefaultHandler is a no-op.
func DefaultHandler(r map[string]interface{}) error {
return nil
}
// LogHandler logs a JSON encoded string.
func LogHandler(r map[string]interface{}) error {
bytes, err := json.MarshalIndent(r, "", "\t")
if err != nil {
return err
}
log.Print(string(bytes))
return nil
}
type Consumer struct {
Handle Handler
ready chan struct{}
done chan struct{}
ch <-chan map[string]interface{}
}
// Start starts a goroutine to consume the channel.
func (c *Consumer) Start() {
// Run a goroutine to consume the channel.
go func() {
close(c.ready)
for pl := range c.ch {
c.Handle(pl)
}
close(c.done)
}()
<-c.ready
}
// Wait waits until the consumer is done handling the payloads.
func (c *Consumer) Wait() {
<-c.done
}
func NewConsumer(ch <-chan map[string]interface{}, handler Handler) *Consumer {
if handler == nil {
handler = DefaultHandler
}
return &Consumer{
Handle: handler,
ready: make(chan struct{}),
done: make(chan struct{}),
ch: ch,
}
}
// Example usage.
func main() {
conn, _ := sqlite3.Open(":memory:")
// Register the handler with a buffer size of 10.
ch := RegisterUpdateHandler(conn, 10)
// Initialize a consumer for the channel.
cs := NewConsumer(ch, LogHandler)
// Starts the consumer.
cs.Start()
// Run the example.
example(conn)
// Close the channel to denote the work is done.
close(ch)
// Wait until the consumer is done now that the channel is closed.
cs.Wait()
}
func example(conn *sqlite3.Conn) {
var err error
// Perform some operations.
err = conn.Exec(`create table "people" (
id int primary key,
name varchar(100) not null,
email varchar(100),
dob date
)`)
if err != nil {
log.Fatal(err)
}
log.Println("created table")
err = conn.Exec(`insert into "people" (id, name) values ($id, $name)`, sqlite3.NamedArgs{
"$id": 1,
"$name": "Joe Smith",
})
if err != nil {
log.Fatal(err)
}
log.Println("inserted row")
err = conn.Exec(`update "people" set email = $email, dob = $dob`, sqlite3.NamedArgs{
"$email": "[email protected]",
"$dob": time.Date(1964, time.March, 23, 0, 0, 0, 0, time.UTC),
})
if err != nil {
log.Fatal(err)
}
log.Println("updated row")
err = conn.Exec(`delete from "people" where id = $id`, sqlite3.NamedArgs{
"$id": 1,
})
if err != nil {
log.Fatal(err)
}
log.Println("deleted row")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment