Skip to content

Instantly share code, notes, and snippets.

@kokizzu
Last active October 21, 2022 20:44
Show Gist options
  • Save kokizzu/27fc52bc9a8a120045c2dfdd5f255f19 to your computer and use it in GitHub Desktop.
Save kokizzu/27fc52bc9a8a120045c2dfdd5f255f19 to your computer and use it in GitHub Desktop.
CockroachDB PoC partial code
package model
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/kokizzu/gotro/L"
"github.com/kokizzu/gotro/T"
)
type DbConn struct {
*pgx.Conn
}
func ConnectCockroach(databaseUrl string) DbConn {
config, err := pgx.ParseConfig(databaseUrl)
L.PanicIf(err, `pgx.ParseConfig`)
config.RuntimeParams["application_name"] = `stormon_` + T.Filename()
conn, err := pgx.ConnectConfig(context.Background(), config)
L.PanicIf(err, `pgx.ConnectConfig`)
return DbConn{conn}
}
package model
import (
"context"
"log"
"github.com/jackc/pgconn"
"github.com/kokizzu/gotro/L"
)
func (c *DbConn) MigrateCockroach() {
ctx := context.Background()
var migrations = []string{
`
CREATE TABLE IF NOT EXISTS "schema_migrations" (
id INT NOT NULL PRIMARY KEY
, CreatedAt TIMESTAMPTZ NOT NULL DEFAULT NOW()
)`,
// spec 2-11,13
ResellersRow_Migrate1,
ServersRow_Migrate1,
TimeGroupsRow_Migrate1,
HostStatsRow_Migrate1,
DiskStatsRow_Migrate1,
ProcStatsRow_Migrate1,
SystemdServicesRow_Migrate1,
SystemdTimersRow_Migrate1,
SpvCtlStatsRow_Migrate1,
// alert sent
AlertsRow_Migrate1,
// indexes
ProcStatsRow_Migrate2,
ProcStatsRow_Migrate3,
ProcStatsRow_Migrate4,
// spec 12: influxdb
// spec 14-21: hal/ES (or add more influxdb on rocket-s3)
}
const nextMigrationQuery = `
SELECT COALESCE(
(SELECT MAX(id)+1 FROM schema_migrations), 0
)`
row := c.QueryRow(ctx, nextMigrationQuery)
var startMigration int
err := row.Scan(&startMigration)
// ignore error
if err != nil {
if pgErr, ok := err.(*pgconn.PgError); ok {
// ignore error if migration never done
if pgErr.Code == `42P01` ||
pgErr.Message == `relation "schema_migrations" does not exist` {
err = nil
}
}
L.IsError(err, `row.Scan: %s`, nextMigrationQuery)
}
totalMigration := len(migrations)
if startMigration < totalMigration {
log.Println(`start migration:`, startMigration)
}
for z := startMigration; z < totalMigration; z++ {
migrationQuery := migrations[z]
_, err := c.Exec(ctx, migrationQuery)
L.PanicIf(err, `c.Exec: %s`, migrationQuery)
const insertQuery = `INSERT INTO schema_migrations (id) VALUES ($1)`
_, err = c.Exec(ctx, insertQuery, z)
L.PanicIf(err, `c.Exec: %s`, insertQuery)
}
log.Println(`total migration:`, totalMigration)
}
package model
import (
"bytes"
"context"
"fmt"
"time"
"github.com/georgysavva/scany/v2/pgxscan"
"github.com/kokizzu/gotro/C"
"github.com/kokizzu/gotro/L"
"github.com/kokizzu/gotro/M"
"github.com/kokizzu/gotro/S"
)
// D double quotes and escapes content
func D(s string) string {
return S.ZZ(s)
}
// E escapes, remove all non-alphanumeric and _ characters
func E(s string) string {
b := bytes.Buffer{}
for _, ch := range s {
if !C.IsIdent(ch) {
continue
}
b.WriteRune(ch)
}
return b.String()
}
// TODO: sort map and create prepared statement cache
// ToUpdateCols generats query string for update columns
// also returns the array of interfaces for update params
func ToUpdateCols(rows M.SX, vals ...any) (string, []interface{}) {
start := len(vals) + 1
res := bytes.Buffer{}
for k, v := range rows {
res.WriteString(`, `)
res.WriteString(D(k))
res.WriteString(` = $`)
res.WriteString(fmt.Sprint(start))
vals = append(vals, v)
start += 1
}
return res.String(), vals
}
// UpdateRow updates a row in the database
func UpdateRow(db DbConn, table string, id int64, now time.Time, cols M.SX) bool {
ctx := context.Background()
sql, vals := ToUpdateCols(cols, now, id)
query := `
UPDATE ` + D(table) + ` SET
UpdatedAt = $1
` + sql + `
WHERE Id = $2
`
_, err := db.Conn.Exec(ctx, query, vals...)
return !L.IsError(err, `db.Conn.Exec: %v`, query)
}
// FindRowById finds a row by id
// please make sure `table` sanitized for sql injection
func FindRowById[T any](db DbConn, table string, id int64, target *T) bool {
ctx := context.Background()
query := `
SELECT *
FROM ` + D(table) + `
WHERE Id = $1
`
var row []T
err := pgxscan.Select(ctx, db.Conn, &row, query, id)
if err == nil && len(row) > 0 {
*target = row[0]
}
return !L.IsError(err, `pgxscan.Select: %v`, query)
}
// ToInsertCols generats query string for insert columns and query params
// also returns the array of interfaces for insert params
func ToInsertCols(rows M.SX, vals ...any) (string, string, []interface{}) {
start := len(vals) + 1
sqlCol := bytes.Buffer{}
sqlVal := bytes.Buffer{}
for k, v := range rows {
sqlVal.WriteString(`, `)
sqlVal.WriteString(E(k))
sqlCol.WriteString(`, $`)
sqlCol.WriteString(fmt.Sprint(start))
vals = append(vals, v)
start += 1
}
return sqlCol.String(), sqlVal.String(), vals
}
// InsertRow inserts a row into the database
func InsertRow(db DbConn, table string, now time.Time, kvs M.SX) (row int64) {
ctx := context.Background()
sqlVal, sqlCol, vals := ToInsertCols(kvs, now)
query := `
INSERT INTO ` + D(table) + `
(CreatedAt, UpdatedAt` + sqlCol + `)
VALUES($1, $1` + sqlVal + `)
RETURNING Id
`
err := db.QueryRow(ctx, query, vals...).Scan(&row)
L.IsError(err, `db.QueryRow: %v`, query)
return
}
package deploy
import (
"sync"
"syscall"
"github.com/kokizzu/goproc"
"github.com/kokizzu/gotro/L"
"github.com/kokizzu/gotro/S"
)
type Database struct {
ProcessManager *goproc.Goproc
ProcessId goproc.CommandId
ConnectionString string
}
func SpawnCockroach(cockroachPath string) *Database {
d := &Database{}
killer := goproc.New()
killer.HasErrFunc = goproc.DiscardHasErr
killer.AddCommand(&goproc.Cmd{
Program: `killall`,
Parameters: []string{
`cockroach`,
},
})
killer.StartAll()
d.ProcessManager = goproc.New()
wg := sync.WaitGroup{}
wg.Add(1)
cockroachStarted := false
//_ = d.ProcessManager.AddCommand(&goproc.Cmd{
// Program: `killall`,
// HideStderr: true,
// HideStdout: true,
//})
d.ProcessId = d.ProcessManager.AddCommand(&goproc.Cmd{
Program: cockroachPath,
Parameters: []string{
`start-single-node`,
`--advertise-addr`,
`localhost`,
`--insecure`,
},
OnStdout: func(cmd *goproc.Cmd, s string) error {
if !cockroachStarted {
if S.StartsWith(s, `nodeID:`) {
wg.Done()
cockroachStarted = true
}
}
return nil
},
})
go d.ProcessManager.StartAll()
wg.Wait()
d.ConnectionString = `postgresql://root@localhost:26257/defaultdb?sslmode=disable`
return d
}
func (d *Database) Close() {
err := d.ProcessManager.Signal(d.ProcessId, syscall.SIGTERM)
L.IsError(err, `d.ProcessManager.Signal`)
}
package model
import (
"time"
"github.com/kokizzu/gotro/M"
"github.com/kpango/fastime"
)
//go:generate gomodifytags -all -remove-tags db -w -file table_procStats.go
//go:generate gomodifytags -all -add-tags db -transform lowercase --skip-unexported -w -file table_procStats.go
const ProcStats = `ProcStats`
type ProcStatsRow struct {
Id int64 `db:"id"`
CreatedAt time.Time `db:"createdat"`
UpdatedAt time.Time `db:"updatedat"`
TimeGroupId int64 `db:"timegroupid"`
ServerId int64 `db:"serverid"`
DC string `db:"dc"`
Pid int64 `db:"pid"`
Uptime int64 `db:"uptime"`
Pcpu float32 `db:"pcpu"`
Rss int64 `db:"rss"`
Cmd string `db:"cmd"`
Process string `db:"process"`
Version string `db:"version"`
Reseller string `db:"reseller"`
}
const (
PsTimeGroupId = `TimeGroupId`
PsServerId = `ServerId`
PsDC = `DC`
PsPid = `Pid`
PsUptime = `Uptime`
PsPcpu = `Pcpu`
PsRss = `Rss`
PsCmd = `Cmd`
PsProcess = `Process`
PsVersion = `Version`
PsReseller = `Reseller`
)
const ProcStatsRow_Migrate1 = `
CREATE TABLE IF NOT EXISTS "ProcStats" (
Id BIGSERIAL NOT NULL PRIMARY KEY
, CreatedAt TIMESTAMPTZ NOT NULL DEFAULT NOW()
, UpdatedAt TIMESTAMPTZ NOT NULL DEFAULT NOW()
, TimeGroupId BIGINT NOT NULL
, ServerId BIGINT NOT NULL
, Pid INT NOT NULL DEFAULT 0
, Uptime BIGINT NOT NULL DEFAULT 0
, Pcpu FLOAT NOT NULL DEFAULT 0
, Rss INT NOT NULL DEFAULT 0
, Cmd STRING NOT NULL DEFAULT ''
, Process STRING NOT NULL DEFAULT ''
, Version STRING NOT NULL DEFAULT ''
, Reseller STRING NOT NULL DEFAULT ''
, DC STRING NOT NULL DEFAULT ''
)`
const ProcStatsRow_Migrate2 = `
CREATE INDEX IF NOT EXISTS "ProcStats_TimeGroupId" ON "ProcStats" (TimeGroupId)
`
const ProcStatsRow_Migrate3 = `
CREATE INDEX IF NOT EXISTS "ProcStats_ServerId" ON "ProcStats" (ServerId)
`
const ProcStatsRow_Migrate4 = `
CREATE INDEX IF NOT EXISTS "ProcStats_Reseller" ON "ProcStats" (Reseller)
`
func (r *ProcStatsRow) Insert(db DbConn) bool {
now := r.CreatedAt
if r.CreatedAt.Unix() <= 0 {
now = fastime.Now()
}
r.Id = InsertRow(db, ProcStats, now, M.SX{
PsTimeGroupId: r.TimeGroupId,
PsServerId: r.ServerId,
PsDC: r.DC,
PsPid: r.Pid,
PsUptime: r.Uptime,
PsPcpu: r.Pcpu,
PsRss: r.Rss,
PsCmd: r.Cmd,
PsProcess: r.Process,
PsVersion: r.Version,
PsReseller: r.Reseller,
})
return r.Id > 0
}
func (r *ProcStatsRow) FindById(db DbConn, id int64) bool {
return FindRowById(db, ProcStats, id, r)
}
package model
import (
"testing"
"time"
"github.com/hexops/autogold"
"github.com/stretchr/testify/assert"
)
// how to regenerate the test:
/*
go test -run TestProcStatsRowOperations -update table_procStats_test.go
*/
func TestProcStatsRowOperations(t *testing.T) {
ps := ProcStatsRow{
CreatedAt: time.Time{},
UpdatedAt: time.Time{},
TimeGroupId: 2,
ServerId: 3,
DC: "4",
Pid: 5,
Uptime: 6,
Pcpu: 7,
Rss: 8,
Cmd: "9",
Process: "10",
Version: "11",
Reseller: "12",
}
t.Run(`InsertNew`, func(t *testing.T) {
ok := ps.Insert(testDb)
assert.True(t, ok)
})
t.Run(`FindById`, func(t *testing.T) {
row := ProcStatsRow{}
row.FindById(testDb, ps.Id)
nonZero(t, &row.CreatedAt)
nonZero(t, &row.UpdatedAt)
want := autogold.Want(`row.FindById`, ps)
want.Equal(t, row)
})
}
package model
import (
"os"
"path"
"testing"
"time"
"github.com/kokizzu/gotro/S"
"stormon/deploy"
)
var testDb DbConn
func TestMain(m *testing.M) {
wd, _ := os.Getwd()
if S.EndsWith(wd, `model`) {
_ = os.Chdir(path.Join(wd, `..`))
}
crdb := deploy.SpawnCockroach(`./cockroach`)
testDb = ConnectCockroach(crdb.ConnectionString)
testDb.MigrateCockroach()
out := m.Run()
crdb.Close()
os.Exit(out)
}
func nonZero(t *testing.T, at *time.Time) {
if at.Unix() > 0 {
*at = time.Time{}
return
}
t.Helper()
t.Fail()
}
package model
import (
"context"
"database/sql"
"time"
"github.com/georgysavva/scany/v2/pgxscan"
"github.com/kokizzu/gotro/L"
"github.com/kokizzu/gotro/M"
"github.com/kpango/fastime"
"github.com/pkg/errors"
)
//go:generate gomodifytags -all -remove-tags db -w -file table_timeGroups.go
//go:generate gomodifytags -all -add-tags db -transform lowercase --skip-unexported -w -file table_timeGroups.go
const TimeGroups = `TimeGroups`
type TimeGroupsRow struct {
Id int64 `db:"id"`
CreatedAt time.Time `db:"createdat"`
UpdatedAt time.Time `db:"updatedat"`
DiskStatsActive int32 `db:"diskstatsactive"`
DiskStatsTimeout int32 `db:"diskstatstimeout"`
ProcStatsActive int32 `db:"procstatsactive"`
ProcStatsTimeout int32 `db:"procstatstimeout"`
SystemdServicesActive int32 `db:"systemdservicesactive"`
SystemdServicesTimeout int32 `db:"systemdservicestimeout"`
SystemdTimersActive int32 `db:"systemdtimersactive"`
SystemdTimersTimeout int32 `db:"systemdtimerstimeout"`
SpvCtlActive int32 `db:"spvctlactive"`
SpvCtlTimeout int32 `db:"spvctltimeout"`
AlertsSuccess int32 `db:"alertssuccess"`
AlertsFailed int32 `db:"alertsfailed"`
}
const (
TgDiskStatsActive = `DiskStatsActive`
TgDiskStatsTimeout = `DiskStatsTimeout`
TgProcStatsActive = `ProcStatsActive`
TgProcStatsTimeout = `ProcStatsTimeout`
TgSystemdServicesActive = `SystemdServicesActive`
TgSystemdServicesTimeout = `SystemdServicesTimeout`
TgSystemdTimersActive = `SystemdTimersActive`
TgSystemdTimersTimeout = `SystemdTimersTimeout`
TgSpvCtlActive = `SpvCtlActive`
TgSpvCtlTimeout = `SpvCtlTimeout`
TgAlertsSuccess = `AlertsSuccess`
TgAlertsFailed = `AlertsFailed`
)
const TimeGroupsRow_Migrate1 = `
CREATE TABLE IF NOT EXISTS "TimeGroups" (
Id BIGSERIAL NOT NULL PRIMARY KEY
, CreatedAt TIMESTAMPTZ NOT NULL DEFAULT NOW()
, UpdatedAt TIMESTAMPTZ NOT NULL DEFAULT NOW()
, DiskStatsActive INT NOT NULL DEFAULT 0
, DiskStatsTimeout INT NOT NULL DEFAULT 0
, ProcStatsActive INT NOT NULL DEFAULT 0
, ProcStatsTimeout INT NOT NULL DEFAULT 0
, SystemdServicesActive INT NOT NULL DEFAULT 0
, SystemdServicesTimeout INT NOT NULL DEFAULT 0
, SystemdTimersActive INT NOT NULL DEFAULT 0
, SystemdTimersTimeout INT NOT NULL DEFAULT 0
, SpvCtlActive INT NOT NULL DEFAULT 0
, SpvCtlTimeout INT NOT NULL DEFAULT 0
, AlertsSuccess INT NOT NULL DEFAULT 0
, AlertsFailed INT NOT NULL DEFAULT 0
)`
func (r *TimeGroupsRow) LastScan(db DbConn) (ok bool) {
ctx := context.Background()
const query = `
SELECT * FROM "TimeGroups" ORDER BY CreatedAt DESC LIMIT 1
`
var row []TimeGroupsRow
err := pgxscan.Select(ctx, db.Conn, &row, query)
if errors.Is(err, sql.ErrNoRows) || len(row) == 0 {
return true // skip if no error
}
*r = row[0]
return !L.IsError(err, `pgxscan.Select: %v`, query)
}
func (r *TimeGroupsRow) Reset() {
*r = TimeGroupsRow{}
}
func (r *TimeGroupsRow) FindById(db DbConn) bool {
return FindRowById(db, TimeGroups, r.Id, r)
}
func (r *TimeGroupsRow) InsertNew(db DbConn, now time.Time) bool {
ctx := context.Background()
r.CreatedAt = now
r.UpdatedAt = now
const query = `
INSERT INTO "TimeGroups"
(CreatedAt, UpdatedAt)
VALUES($1, $1)
RETURNING Id, CreatedAt, UpdatedAt
`
row := []TimeGroupsRow{{CreatedAt: now, UpdatedAt: now}}
err := pgxscan.Select(ctx, db.Conn, &row, query, now)
if !L.IsError(err, `pgxscan.Select: %v`, query) {
*r = row[0]
return true
}
return false
}
func (r *TimeGroupsRow) Update(db DbConn, kvs M.SX) bool {
return UpdateRow(db, TimeGroups, r.Id, fastime.Now(), kvs)
}
package model
import (
"testing"
"time"
"github.com/hexops/autogold"
"github.com/kokizzu/gotro/M"
"github.com/stretchr/testify/assert"
)
// how to regenerate the test:
/*
go test -run TestTimeGroupsRowOperations -update table_timeGroups_test.go
*/
func TestTimeGroupsRowOperations(t *testing.T) {
tg := TimeGroupsRow{
Id: 1,
CreatedAt: time.Time{},
UpdatedAt: time.Time{},
DiskStatsActive: 3,
DiskStatsTimeout: 4,
ProcStatsActive: 5,
ProcStatsTimeout: 6,
SystemdServicesActive: 7,
SystemdServicesTimeout: 8,
SystemdTimersActive: 9,
SystemdTimersTimeout: 10,
SpvCtlActive: 11,
SpvCtlTimeout: 12,
AlertsSuccess: 13,
AlertsFailed: 14,
}
now, _ := time.Parse(`2006-01-02`, `2022-12-31`)
now = now.UTC()
t.Run(`InsertNew`, func(t *testing.T) {
// will reset tg receiver struct values
ok := tg.InsertNew(testDb, now)
assert.True(t, ok)
empty := TimeGroupsRow{}
empty.Id = tg.Id
empty.CreatedAt = now
empty.UpdatedAt = now
want := autogold.Want(`tg.InsertNew_MustEmpty`, empty)
want.Equal(t, tg)
})
now.Add(time.Hour)
t.Run(`UpdateRow`, func(t *testing.T) {
ok := UpdateRow(testDb, TimeGroups, tg.Id, now, M.SX{
TgDiskStatsActive: 31,
TgDiskStatsTimeout: 41,
TgProcStatsActive: 51,
TgProcStatsTimeout: 61,
TgSystemdServicesActive: 71,
TgSystemdServicesTimeout: 81,
TgSystemdTimersActive: 91,
TgSystemdTimersTimeout: 101,
TgSpvCtlActive: 111,
TgSpvCtlTimeout: 121,
TgAlertsSuccess: 131,
TgAlertsFailed: 141,
})
assert.True(t, ok)
})
t.Run(`FindRowById`, func(t *testing.T) {
row := TimeGroupsRow{Id: tg.Id}
ok := row.FindById(testDb)
assert.True(t, ok)
want := autogold.Want(`tg.FindRowById`, TimeGroupsRow{
Id: tg.Id,
CreatedAt: now,
UpdatedAt: now,
DiskStatsActive: 31,
DiskStatsTimeout: 41,
ProcStatsActive: 51,
ProcStatsTimeout: 61,
SystemdServicesActive: 71,
SystemdServicesTimeout: 81,
SystemdTimersActive: 91,
SystemdTimersTimeout: 101,
SpvCtlActive: 111,
SpvCtlTimeout: 121,
AlertsSuccess: 131,
AlertsFailed: 141,
})
want.Equal(t, row)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment