|
package main |
|
|
|
import ( |
|
"encoding/json" |
|
"fmt" |
|
"log" |
|
"time" |
|
|
|
"code.google.com/p/go-sqlite/go1/sqlite3" |
|
) |
|
|
|
// OpString returns a string representation of update operation. |
|
func OpString(op int) string { |
|
switch op { |
|
case sqlite3.INSERT: |
|
return "insert" |
|
|
|
case sqlite3.UPDATE: |
|
return "update" |
|
|
|
case sqlite3.DELETE: |
|
return "delete" |
|
|
|
default: |
|
return "" |
|
} |
|
} |
|
|
|
// RowByID returns a row in a table for the specified row ID. |
|
func RowByID(conn *sqlite3.Conn, tbl string, rid int64) (sqlite3.RowMap, error) { |
|
sql := fmt.Sprintf("select * from %s where rowid = $rid", tbl) |
|
|
|
stmt, err := conn.Query(sql, sqlite3.NamedArgs{"$rid": rid}) |
|
|
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
// Get a row map back. |
|
row := sqlite3.RowMap{} |
|
|
|
if err = stmt.Scan(row); err != nil { |
|
return nil, err |
|
} |
|
|
|
return row, nil |
|
} |
|
|
|
// RegisterUpdateHandler takes a connection and registers an update handler with |
|
// it to emit records that are inserted, updated, or deleted. |
|
func RegisterUpdateHandler(conn *sqlite3.Conn, bufsize int) chan map[string]interface{} { |
|
var ( |
|
err error |
|
pl map[string]interface{} |
|
rm sqlite3.RowMap |
|
|
|
ch = make(chan map[string]interface{}, bufsize) |
|
) |
|
|
|
// Note, this overrides the previously registered handler. |
|
conn.UpdateFunc(func(op int, rdb, rtbl sqlite3.RawString, rid int64) { |
|
db := rdb.Copy() |
|
tbl := rtbl.Copy() |
|
|
|
pl = map[string]interface{}{ |
|
"operation": OpString(op), |
|
"database": db, |
|
"table": tbl, |
|
"row": rid, |
|
"record": nil, |
|
"error": nil, |
|
} |
|
|
|
// Get the record for insert and update operations. |
|
if op != sqlite3.DELETE { |
|
rm, err = RowByID(conn, tbl, rid) |
|
|
|
if err != nil { |
|
pl["error"] = err |
|
} else { |
|
pl["record"] = rm |
|
} |
|
} |
|
|
|
ch <- pl |
|
}) |
|
|
|
return ch |
|
} |
|
|
|
type Handler func(map[string]interface{}) error |
|
|
|
// DefaultHandler is a no-op. |
|
func DefaultHandler(r map[string]interface{}) error { |
|
return nil |
|
} |
|
|
|
// LogHandler logs a JSON encoded string. |
|
func LogHandler(r map[string]interface{}) error { |
|
bytes, err := json.MarshalIndent(r, "", "\t") |
|
|
|
if err != nil { |
|
return err |
|
} |
|
|
|
log.Print(string(bytes)) |
|
|
|
return nil |
|
} |
|
|
|
type Consumer struct { |
|
Handle Handler |
|
|
|
ready chan struct{} |
|
done chan struct{} |
|
ch <-chan map[string]interface{} |
|
} |
|
|
|
// Start starts a goroutine to consume the channel. |
|
func (c *Consumer) Start() { |
|
// Run a goroutine to consume the channel. |
|
go func() { |
|
close(c.ready) |
|
|
|
for pl := range c.ch { |
|
c.Handle(pl) |
|
} |
|
|
|
close(c.done) |
|
}() |
|
|
|
<-c.ready |
|
} |
|
|
|
// Wait waits until the consumer is done handling the payloads. |
|
func (c *Consumer) Wait() { |
|
<-c.done |
|
} |
|
|
|
func NewConsumer(ch <-chan map[string]interface{}, handler Handler) *Consumer { |
|
if handler == nil { |
|
handler = DefaultHandler |
|
} |
|
|
|
return &Consumer{ |
|
Handle: handler, |
|
ready: make(chan struct{}), |
|
done: make(chan struct{}), |
|
ch: ch, |
|
} |
|
} |
|
|
|
// Example usage. |
|
func main() { |
|
conn, _ := sqlite3.Open(":memory:") |
|
|
|
// Register the handler with a buffer size of 10. |
|
ch := RegisterUpdateHandler(conn, 10) |
|
|
|
// Initialize a consumer for the channel. |
|
cs := NewConsumer(ch, LogHandler) |
|
|
|
// Starts the consumer. |
|
cs.Start() |
|
|
|
// Run the example. |
|
example(conn) |
|
|
|
// Close the channel to denote the work is done. |
|
close(ch) |
|
|
|
// Wait until the consumer is done now that the channel is closed. |
|
cs.Wait() |
|
} |
|
|
|
func example(conn *sqlite3.Conn) { |
|
var err error |
|
|
|
// Perform some operations. |
|
err = conn.Exec(`create table "people" ( |
|
id int primary key, |
|
name varchar(100) not null, |
|
email varchar(100), |
|
dob date |
|
)`) |
|
|
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
log.Println("created table") |
|
|
|
err = conn.Exec(`insert into "people" (id, name) values ($id, $name)`, sqlite3.NamedArgs{ |
|
"$id": 1, |
|
"$name": "Joe Smith", |
|
}) |
|
|
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
log.Println("inserted row") |
|
|
|
err = conn.Exec(`update "people" set email = $email, dob = $dob`, sqlite3.NamedArgs{ |
|
"$email": "[email protected]", |
|
"$dob": time.Date(1964, time.March, 23, 0, 0, 0, 0, time.UTC), |
|
}) |
|
|
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
log.Println("updated row") |
|
|
|
err = conn.Exec(`delete from "people" where id = $id`, sqlite3.NamedArgs{ |
|
"$id": 1, |
|
}) |
|
|
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
log.Println("deleted row") |
|
} |