Skip to content

Instantly share code, notes, and snippets.

@freeekanayaka
Last active May 31, 2020 05:37
Show Gist options
  • Save freeekanayaka/18d3db39f6356a99d440c84792495a0b to your computer and use it in GitHub Desktop.
Save freeekanayaka/18d3db39f6356a99d440c84792495a0b to your computer and use it in GitHub Desktop.
Sample pulse oximeter with Dqlite and Go
package main
import (
"context"
"database/sql"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"os"
"strconv"
"time"
"github.com/canonical/go-dqlite"
"github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/driver"
_ "github.com/mattn/go-sqlite3"
)
func startEngine(id uint64, address string) {
dir := fmt.Sprintf("./oximeter-data-%d", id)
os.Mkdir(dir, 0755)
node, err := dqlite.New(
id, address, dir,
dqlite.WithBindAddress(address),
dqlite.WithNetworkLatency(10*time.Millisecond),
)
if err != nil {
log.Fatal(err)
}
if err := node.Start(); err != nil {
log.Fatal(err)
}
}
func joinCluster(id uint64, address string) {
if id == 1 {
return
}
cli, err := client.New(context.Background(), "127.0.0.1:9001")
if err == nil {
cli.Add(context.Background(), client.NodeInfo{
ID: uint64(id),
Address: address,
})
cli.Close()
}
}
func registerDriver() {
store := client.NewInmemNodeStore()
store.Set(context.Background(), []client.NodeInfo{
{Address: "127.0.0.1:9001"},
{Address: "127.0.0.1:9002"},
{Address: "127.0.0.1:9003"},
})
driver, err := driver.New(store)
if err != nil {
log.Fatal(err)
}
sql.Register("dqlite", driver)
}
func isFatalError(err error) bool {
if err == nil {
return false
}
if err, ok := err.(driver.Error); ok {
return err.Code != driver.ErrBusy
}
return true
}
func getDatabase(id uint64) *sql.DB {
address := fmt.Sprintf("127.0.0.1:900%d", id)
startEngine(id, address)
joinCluster(id, address)
registerDriver()
db, err := sql.Open("dqlite", "oximeter.db")
if err != nil {
log.Fatal(err)
}
_, err = db.Exec(
"CREATE TABLE IF NOT EXISTS saturation " +
"(value FLOAT, time DATETIME DEFAULT CURRENT_TIMESTAMP)",
)
if isFatalError(err) {
log.Fatal(err)
}
return db
}
func measureSaturation() float64 {
return 95.0 + 5*rand.Float64()
}
func persistSaturation(db *sql.DB, value float64) {
_, err := db.Exec("INSERT INTO saturation (value) VALUES(?)", value)
if isFatalError(err) {
log.Fatal(err)
}
}
func retrieveAverageSaturation(db *sql.DB, tail time.Duration) float64 {
row := db.QueryRow(
"SELECT avg(value) FROM saturation WHERE time >= ?", time.Now().UTC().Add(-tail))
var average float64
if err := row.Scan(&average); err != nil {
log.Fatal(err)
}
return average
}
func main() {
id, _ := strconv.Atoi(os.Args[1])
db := getDatabase(uint64(id))
defer db.Close()
go func() {
for {
persistSaturation(db, measureSaturation())
time.Sleep(30 * time.Second)
}
}()
port := fmt.Sprintf(":808%d", id)
http.ListenAndServe(port, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tail, err := time.ParseDuration(r.URL.Query()["tail"][0])
if err != nil {
log.Fatal(err)
}
io.WriteString(w, fmt.Sprintf("%f\n", retrieveAverageSaturation(db, tail)))
}))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment