Last active
May 31, 2020 05:37
-
-
Save freeekanayaka/18d3db39f6356a99d440c84792495a0b to your computer and use it in GitHub Desktop.
Sample pulse oximeter with Dqlite and Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"database/sql" | |
"fmt" | |
"io" | |
"log" | |
"math/rand" | |
"net/http" | |
"os" | |
"strconv" | |
"time" | |
"github.com/canonical/go-dqlite" | |
"github.com/canonical/go-dqlite/client" | |
"github.com/canonical/go-dqlite/driver" | |
_ "github.com/mattn/go-sqlite3" | |
) | |
func startEngine(id uint64, address string) { | |
dir := fmt.Sprintf("./oximeter-data-%d", id) | |
os.Mkdir(dir, 0755) | |
node, err := dqlite.New( | |
id, address, dir, | |
dqlite.WithBindAddress(address), | |
dqlite.WithNetworkLatency(10*time.Millisecond), | |
) | |
if err != nil { | |
log.Fatal(err) | |
} | |
if err := node.Start(); err != nil { | |
log.Fatal(err) | |
} | |
} | |
func joinCluster(id uint64, address string) { | |
if id == 1 { | |
return | |
} | |
cli, err := client.New(context.Background(), "127.0.0.1:9001") | |
if err == nil { | |
cli.Add(context.Background(), client.NodeInfo{ | |
ID: uint64(id), | |
Address: address, | |
}) | |
cli.Close() | |
} | |
} | |
func registerDriver() { | |
store := client.NewInmemNodeStore() | |
store.Set(context.Background(), []client.NodeInfo{ | |
{Address: "127.0.0.1:9001"}, | |
{Address: "127.0.0.1:9002"}, | |
{Address: "127.0.0.1:9003"}, | |
}) | |
driver, err := driver.New(store) | |
if err != nil { | |
log.Fatal(err) | |
} | |
sql.Register("dqlite", driver) | |
} | |
func isFatalError(err error) bool { | |
if err == nil { | |
return false | |
} | |
if err, ok := err.(driver.Error); ok { | |
return err.Code != driver.ErrBusy | |
} | |
return true | |
} | |
func getDatabase(id uint64) *sql.DB { | |
address := fmt.Sprintf("127.0.0.1:900%d", id) | |
startEngine(id, address) | |
joinCluster(id, address) | |
registerDriver() | |
db, err := sql.Open("dqlite", "oximeter.db") | |
if err != nil { | |
log.Fatal(err) | |
} | |
_, err = db.Exec( | |
"CREATE TABLE IF NOT EXISTS saturation " + | |
"(value FLOAT, time DATETIME DEFAULT CURRENT_TIMESTAMP)", | |
) | |
if isFatalError(err) { | |
log.Fatal(err) | |
} | |
return db | |
} | |
func measureSaturation() float64 { | |
return 95.0 + 5*rand.Float64() | |
} | |
func persistSaturation(db *sql.DB, value float64) { | |
_, err := db.Exec("INSERT INTO saturation (value) VALUES(?)", value) | |
if isFatalError(err) { | |
log.Fatal(err) | |
} | |
} | |
func retrieveAverageSaturation(db *sql.DB, tail time.Duration) float64 { | |
row := db.QueryRow( | |
"SELECT avg(value) FROM saturation WHERE time >= ?", time.Now().UTC().Add(-tail)) | |
var average float64 | |
if err := row.Scan(&average); err != nil { | |
log.Fatal(err) | |
} | |
return average | |
} | |
func main() { | |
id, _ := strconv.Atoi(os.Args[1]) | |
db := getDatabase(uint64(id)) | |
defer db.Close() | |
go func() { | |
for { | |
persistSaturation(db, measureSaturation()) | |
time.Sleep(30 * time.Second) | |
} | |
}() | |
port := fmt.Sprintf(":808%d", id) | |
http.ListenAndServe(port, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
tail, err := time.ParseDuration(r.URL.Query()["tail"][0]) | |
if err != nil { | |
log.Fatal(err) | |
} | |
io.WriteString(w, fmt.Sprintf("%f\n", retrieveAverageSaturation(db, tail))) | |
})) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment