Created
September 10, 2023 06:43
-
-
Save aldy505/29f1a667600bb94254b7669733667049 to your computer and use it in GitHub Desktop.
Multi database on runtime proof-of-concept
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
services: | |
database: | |
image: postgres:15-alpine | |
ports: | |
- 5432:5432 | |
environment: | |
POSTGRES_PASSWORD: password |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module datarace-repro | |
go 1.21.1 | |
require ( | |
github.com/go-chi/chi/v5 v5.0.10 | |
github.com/gofiber/fiber/v2 v2.49.1 | |
github.com/lib/pq v1.10.9 | |
) | |
require ( | |
github.com/andybalholm/brotli v1.0.5 // indirect | |
github.com/google/uuid v1.3.1 // indirect | |
github.com/klauspost/compress v1.16.7 // indirect | |
github.com/mattn/go-colorable v0.1.13 // indirect | |
github.com/mattn/go-isatty v0.0.19 // indirect | |
github.com/mattn/go-runewidth v0.0.15 // indirect | |
github.com/rivo/uniseg v0.2.0 // indirect | |
github.com/valyala/bytebufferpool v1.0.0 // indirect | |
github.com/valyala/fasthttp v1.49.0 // indirect | |
github.com/valyala/tcplisten v1.0.0 // indirect | |
golang.org/x/sys v0.11.0 // indirect | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// How to actually run this file: | |
// | |
// Start the postgresql server first by doing: | |
// docker compose up -d | |
// | |
// Then, choose a framework. If you want to run a net/http server, | |
// just do `go run .` | |
// But if you want to run a Fiber server (this is for some reason, a bit buggy) | |
// do `go run . fiber`. | |
// | |
// A few endpoints that you can hit afterwards: | |
// GET http://localhost:4000/register/{name} | |
// GET http://localhost:4000/random/{name} | |
// GET http://localhost:4000/peek-map | |
// | |
// Enjoy! | |
package main | |
import ( | |
"context" | |
"database/sql" | |
"encoding/json" | |
"fmt" | |
"log" | |
"math/rand" | |
"net/http" | |
"os" | |
"os/signal" | |
"strings" | |
"sync" | |
"time" | |
"github.com/go-chi/chi/v5" | |
"github.com/go-chi/chi/v5/middleware" | |
"github.com/gofiber/fiber/v2" | |
"github.com/gofiber/fiber/v2/middleware/logger" | |
_ "github.com/lib/pq" | |
) | |
type TenantConnectionMapper struct { | |
GlobalDatabaseConfig string | |
MasterConn *sql.DB | |
TenantConn map[string]*sql.DB | |
sync.RWMutex | |
} | |
func main() { | |
databaseHost, ok := os.LookupEnv("DB_HOST") | |
if !ok { | |
databaseHost = "localhost" | |
} | |
databasePort, ok := os.LookupEnv("DB_PORT") | |
if !ok { | |
databasePort = "5432" | |
} | |
databaseUser, ok := os.LookupEnv("DB_USER") | |
if !ok { | |
databaseUser = "postgres" | |
} | |
databasePassword, ok := os.LookupEnv("DB_PASSWORD") | |
if !ok { | |
databasePassword = "password" | |
} | |
databaseMasterName, ok := os.LookupEnv("DB_MASTER_NAME") | |
if !ok { | |
databaseMasterName = "postgres" | |
} | |
masterConn, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", databaseHost, databasePort, databaseUser, databasePassword, databaseMasterName)) | |
if err != nil { | |
log.Fatalf("connecting to master database: %s", err.Error()) | |
return | |
} | |
defer func() { | |
err := masterConn.Close() | |
if err != nil { | |
log.Printf("closing master database: %s", err.Error()) | |
} | |
}() | |
tenantConnectionMapper := &TenantConnectionMapper{ | |
GlobalDatabaseConfig: fmt.Sprintf("host=%s port=%s user=%s password=%s sslmode=disable", databaseHost, databasePort, databaseUser, databasePassword), | |
MasterConn: masterConn, | |
TenantConn: make(map[string]*sql.DB), | |
} | |
if len(os.Args) > 1 && os.Args[1] == "fiber" { | |
app := fiber.New() | |
app.Use(logger.New()) | |
// NOTE: DO NOT use GET on production, use something else instead. | |
// This is for having everything easy with cURL and the like. | |
app.Get("/register/:tenant", func(c *fiber.Ctx) error { | |
tenant := c.Params("tenant") | |
if tenant == "" { | |
c.Status(fiber.StatusBadRequest) | |
return c.JSON(map[string]string{"message": "tenant is empty"}) | |
} | |
err := createNewTenantDatabase(c.Context(), tenantConnectionMapper, tenant) | |
if err != nil { | |
return err | |
} | |
err = setupNewTenantTables(c.Context(), tenantConnectionMapper, tenant) | |
if err != nil { | |
return err | |
} | |
go startRandomNumberSeed(tenantConnectionMapper, tenant) | |
c.Status(fiber.StatusOK) | |
return c.JSON(map[string]string{"message": "tenant has been created"}) | |
}) | |
app.Get("/random/:tenant", func(c *fiber.Ctx) error { | |
tenant := c.Params("tenant") | |
if tenant == "" { | |
c.Status(fiber.StatusBadRequest) | |
return c.JSON(map[string]string{"message": "tenant is empty"}) | |
} | |
rows, err := getRandomNumberTableByTenant(c.Context(), tenantConnectionMapper, tenant) | |
if err != nil { | |
return err | |
} | |
return c.JSON(rows) | |
}) | |
app.Get("/peek-map", func(c *fiber.Ctx) error { | |
var tenants []string | |
for name := range tenantConnectionMapper.TenantConn { | |
tenants = append(tenants, name) | |
} | |
return c.JSON(tenants) | |
}) | |
errChan := make(chan os.Signal, 1) | |
signal.Notify(errChan, os.Interrupt) | |
go func() { | |
<-errChan | |
app.Shutdown() | |
}() | |
app.Listen(":4000") | |
} else { | |
app := chi.NewRouter() | |
app.Use(middleware.Logger) | |
app.Get("/register/{tenant}", func(w http.ResponseWriter, r *http.Request) { | |
tenant := chi.URLParam(r, "tenant") | |
if tenant == "" { | |
w.WriteHeader(http.StatusBadRequest) | |
json.NewEncoder(w).Encode(map[string]string{"message": "tenant is empty"}) | |
return | |
} | |
err := createNewTenantDatabase(r.Context(), tenantConnectionMapper, tenant) | |
if err != nil { | |
w.WriteHeader(http.StatusInternalServerError) | |
w.Write([]byte(err.Error())) | |
return | |
} | |
err = setupNewTenantTables(r.Context(), tenantConnectionMapper, tenant) | |
if err != nil { | |
w.WriteHeader(http.StatusInternalServerError) | |
w.Write([]byte(err.Error())) | |
return | |
} | |
go startRandomNumberSeed(tenantConnectionMapper, tenant) | |
w.WriteHeader(http.StatusOK) | |
json.NewEncoder(w).Encode(map[string]string{"message": "tenant has been created"}) | |
}) | |
app.Get("/random/{tenant}", func(w http.ResponseWriter, r *http.Request) { | |
tenant := chi.URLParam(r, "tenant") | |
if tenant == "" { | |
w.WriteHeader(http.StatusBadRequest) | |
json.NewEncoder(w).Encode(map[string]string{"message": "tenant is empty"}) | |
return | |
} | |
rows, err := getRandomNumberTableByTenant(r.Context(), tenantConnectionMapper, tenant) | |
if err != nil { | |
w.WriteHeader(http.StatusInternalServerError) | |
w.Write([]byte(err.Error())) | |
return | |
} | |
w.WriteHeader(http.StatusOK) | |
json.NewEncoder(w).Encode(rows) | |
}) | |
app.Get("/peek-map", func(w http.ResponseWriter, r *http.Request) { | |
var tenants []string | |
for name := range tenantConnectionMapper.TenantConn { | |
tenants = append(tenants, name) | |
} | |
w.WriteHeader(http.StatusOK) | |
json.NewEncoder(w).Encode(tenants) | |
}) | |
errChan := make(chan os.Signal, 1) | |
signal.Notify(errChan, os.Interrupt) | |
server := &http.Server{ | |
Handler: app, | |
Addr: ":4000", | |
} | |
go func() { | |
<-errChan | |
server.Shutdown(context.Background()) | |
}() | |
log.Printf("server started on %s", server.Addr) | |
server.ListenAndServe() | |
} | |
// Close the connection after the app is shut down | |
tenantConnectionMapper.RLock() | |
for tenant, conn := range tenantConnectionMapper.TenantConn { | |
err := conn.Close() | |
if err != nil { | |
log.Printf("closing %s database: %s", tenant, err.Error()) | |
} | |
} | |
tenantConnectionMapper.RUnlock() | |
} | |
func createNewTenantDatabase(ctx context.Context, tenantMapper *TenantConnectionMapper, tenantName string) error { | |
_, err := tenantMapper.MasterConn.ExecContext(ctx, `CREATE DATABASE `+tenantName) | |
if err != nil && strings.Contains(err.Error(), "already exists") { | |
return err | |
} | |
tenantConnection, err := sql.Open("postgres", tenantMapper.GlobalDatabaseConfig+" dbname= "+tenantName) | |
if err != nil { | |
return err | |
} | |
tenantMapper.Lock() | |
defer tenantMapper.Unlock() | |
tenantMapper.TenantConn[tenantName] = tenantConnection | |
return nil | |
} | |
func setupNewTenantTables(ctx context.Context, tenantMapper *TenantConnectionMapper, tenantName string) error { | |
// No need to try lock, we're not doing a write operation | |
tenantConnection, ok := tenantMapper.TenantConn[tenantName] | |
if !ok { | |
return fmt.Errorf("tenant is not set: %s", tenantName) | |
} | |
conn, err := tenantConnection.Conn(ctx) | |
if err != nil { | |
return err | |
} | |
defer func() { | |
err := conn.Close() | |
if err != nil { | |
log.Printf("[%s] closing connection: %s", tenantName, err.Error()) | |
} | |
}() | |
_, err = conn.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS random_number ( | |
timestamp TIMESTAMP DEFAULT NOW(), | |
value INTEGER NOT NULL | |
)`) | |
if err != nil { | |
return err | |
} | |
return nil | |
} | |
func startRandomNumberSeed(tenantMapper *TenantConnectionMapper, tenantName string) { | |
ctx := context.Background() | |
for { | |
tenantConnection, ok := tenantMapper.TenantConn[tenantName] | |
if !ok { | |
log.Printf("tenant not found: %s", tenantName) | |
time.Sleep(time.Second) | |
continue | |
} | |
conn, err := tenantConnection.Conn(ctx) | |
if err != nil { | |
log.Printf("[%s] acquiring connection from pool: %s", tenantName, err.Error()) | |
time.Sleep(time.Second) | |
continue | |
} | |
_, err = conn.ExecContext(ctx, `INSERT INTO random_number | |
( | |
timestamp, | |
value | |
) | |
VALUES | |
($1, $2) | |
`, | |
time.Now(), | |
rand.Int31(), | |
) | |
if err != nil { | |
err := conn.Close() | |
if err != nil { | |
log.Printf("[%s] closing connection: %s", tenantName, err.Error()) | |
} | |
log.Printf("[%s] inserting item: %s", tenantName, err.Error()) | |
time.Sleep(time.Second) | |
continue | |
} | |
err = conn.Close() | |
if err != nil { | |
log.Printf("[%s] closing connection: %s", tenantName, err.Error()) | |
} | |
time.Sleep(time.Second) | |
} | |
} | |
type RandomNumberTable struct { | |
Timestamp time.Time | |
RandomNumber int | |
} | |
func getRandomNumberTableByTenant(ctx context.Context, tenantMapper *TenantConnectionMapper, tenantName string) ([]RandomNumberTable, error) { | |
tenantConnection, ok := tenantMapper.TenantConn[tenantName] | |
if !ok { | |
return nil, fmt.Errorf("tenant is not set: %s", tenantName) | |
} | |
conn, err := tenantConnection.Conn(ctx) | |
if err != nil { | |
return nil, err | |
} | |
defer func() { | |
err := conn.Close() | |
if err != nil { | |
log.Printf("[%s] closing connection: %s", tenantName, err.Error()) | |
} | |
}() | |
rows, err := conn.QueryContext(ctx, `SELECT timestamp, value FROM random_number LIMIT 5000`) | |
if err != nil { | |
return nil, err | |
} | |
defer func() { | |
err := rows.Close() | |
if err != nil { | |
log.Printf("[%s] closing rows: %s", tenantName, err.Error()) | |
} | |
}() | |
var randomNumberTable []RandomNumberTable | |
for rows.Next() { | |
var entry RandomNumberTable | |
err := rows.Scan(&entry.Timestamp, &entry.RandomNumber) | |
if err != nil { | |
return nil, err | |
} | |
randomNumberTable = append(randomNumberTable, entry) | |
} | |
return randomNumberTable, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment