-
-
Save dfrankland/7c98dd7de69e1f81d739e93c26ac54cb to your computer and use it in GitHub Desktop.
Sample pulse oximeter with Dqlite and Go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"database/sql" | |
"fmt" | |
"io" | |
"log" | |
"math/rand" | |
"net/http" | |
"os" | |
"strconv" | |
"time" | |
"github.com/canonical/go-dqlite" | |
"github.com/canonical/go-dqlite/client" | |
"github.com/canonical/go-dqlite/driver" | |
_ "github.com/mattn/go-sqlite3" | |
) | |
func startEngine(id uint64, address string) { | |
dir := fmt.Sprintf("./oximeter-data-%d", id) | |
os.Mkdir(dir, 0755) | |
node, err := dqlite.New( | |
id, address, dir, | |
dqlite.WithBindAddress(address), | |
dqlite.WithNetworkLatency(10*time.Millisecond), | |
) | |
if err != nil { | |
log.Fatal(err) | |
} | |
if err := node.Start(); err != nil { | |
log.Fatal(err) | |
} | |
} | |
func joinCluster(id uint64, address string) { | |
if id == 1 { | |
return | |
} | |
cli, err := client.New(context.Background(), "127.0.0.1:9001") | |
if err == nil { | |
cli.Add(context.Background(), client.NodeInfo{ | |
ID: uint64(id), | |
Address: address, | |
}) | |
cli.Close() | |
} | |
} | |
func registerDriver() { | |
store := client.NewInmemNodeStore() | |
store.Set(context.Background(), []client.NodeInfo{ | |
{Address: "127.0.0.1:9001"}, | |
{Address: "127.0.0.1:9002"}, | |
{Address: "127.0.0.1:9003"}, | |
}) | |
driver, err := driver.New(store) | |
if err != nil { | |
log.Fatal(err) | |
} | |
sql.Register("dqlite", driver) | |
} | |
func isFatalError(err error) bool { | |
if err == nil { | |
return false | |
} | |
if err, ok := err.(driver.Error); ok { | |
return err.Code != driver.ErrBusy | |
} | |
return true | |
} | |
func getDatabase(id uint64) *sql.DB { | |
address := fmt.Sprintf("127.0.0.1:900%d", id) | |
startEngine(id, address) | |
joinCluster(id, address) | |
registerDriver() | |
db, err := sql.Open("dqlite", "oximeter.db") | |
if err != nil { | |
log.Fatal(err) | |
} | |
_, err = db.Exec( | |
"CREATE TABLE IF NOT EXISTS saturation " + | |
"(value FLOAT, time DATETIME DEFAULT CURRENT_TIMESTAMP)", | |
) | |
if isFatalError(err) { | |
log.Fatal(err) | |
} | |
return db | |
} | |
func measureSaturation() float64 { | |
return 95.0 + 5*rand.Float64() | |
} | |
func persistSaturation(db *sql.DB, value float64) { | |
_, err := db.Exec("INSERT INTO saturation (value) VALUES(?)", value) | |
if isFatalError(err) { | |
log.Fatal(err) | |
} | |
} | |
func retrieveAverageSaturation(db *sql.DB, tail time.Duration) float64 { | |
row := db.QueryRow( | |
"SELECT avg(value) FROM saturation WHERE time >= ?", time.Now().UTC().Add(-tail)) | |
var average float64 | |
if err := row.Scan(&average); err != nil { | |
log.Fatal(err) | |
} | |
return average | |
} | |
func main() { | |
id, _ := strconv.Atoi(os.Args[1]) | |
db := getDatabase(uint64(id)) | |
defer db.Close() | |
go func() { | |
for { | |
persistSaturation(db, measureSaturation()) | |
time.Sleep(30 * time.Second) | |
} | |
}() | |
port := fmt.Sprintf(":808%d", id) | |
http.ListenAndServe(port, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
tail, err := time.ParseDuration(r.URL.Query()["tail"][0]) | |
if err != nil { | |
log.Fatal(err) | |
} | |
io.WriteString(w, fmt.Sprintf("%f\n", retrieveAverageSaturation(db, tail))) | |
})) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment