Skip to content

Instantly share code, notes, and snippets.

@aldy505
Created September 10, 2023 06:43
Show Gist options
  • Save aldy505/29f1a667600bb94254b7669733667049 to your computer and use it in GitHub Desktop.
Save aldy505/29f1a667600bb94254b7669733667049 to your computer and use it in GitHub Desktop.
Multi database on runtime proof-of-concept
services:
database:
image: postgres:15-alpine
ports:
- 5432:5432
environment:
POSTGRES_PASSWORD: password
module datarace-repro
go 1.21.1
require (
github.com/go-chi/chi/v5 v5.0.10
github.com/gofiber/fiber/v2 v2.49.1
github.com/lib/pq v1.10.9
)
require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.49.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
golang.org/x/sys v0.11.0 // indirect
)
// How to actually run this file:
//
// Start the postgresql server first by doing:
// docker compose up -d
//
// Then, choose a framework. If you want to run a net/http server,
// just do `go run .`
// But if you want to run a Fiber server (this is for some reason, a bit buggy)
// do `go run . fiber`.
//
// A few endpoints that you can hit afterwards:
// GET http://localhost:4000/register/{name}
// GET http://localhost:4000/random/{name}
// GET http://localhost:4000/peek-map
//
// Enjoy!
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/logger"
_ "github.com/lib/pq"
)
type TenantConnectionMapper struct {
GlobalDatabaseConfig string
MasterConn *sql.DB
TenantConn map[string]*sql.DB
sync.RWMutex
}
func main() {
databaseHost, ok := os.LookupEnv("DB_HOST")
if !ok {
databaseHost = "localhost"
}
databasePort, ok := os.LookupEnv("DB_PORT")
if !ok {
databasePort = "5432"
}
databaseUser, ok := os.LookupEnv("DB_USER")
if !ok {
databaseUser = "postgres"
}
databasePassword, ok := os.LookupEnv("DB_PASSWORD")
if !ok {
databasePassword = "password"
}
databaseMasterName, ok := os.LookupEnv("DB_MASTER_NAME")
if !ok {
databaseMasterName = "postgres"
}
masterConn, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", databaseHost, databasePort, databaseUser, databasePassword, databaseMasterName))
if err != nil {
log.Fatalf("connecting to master database: %s", err.Error())
return
}
defer func() {
err := masterConn.Close()
if err != nil {
log.Printf("closing master database: %s", err.Error())
}
}()
tenantConnectionMapper := &TenantConnectionMapper{
GlobalDatabaseConfig: fmt.Sprintf("host=%s port=%s user=%s password=%s sslmode=disable", databaseHost, databasePort, databaseUser, databasePassword),
MasterConn: masterConn,
TenantConn: make(map[string]*sql.DB),
}
if len(os.Args) > 1 && os.Args[1] == "fiber" {
app := fiber.New()
app.Use(logger.New())
// NOTE: DO NOT use GET on production, use something else instead.
// This is for having everything easy with cURL and the like.
app.Get("/register/:tenant", func(c *fiber.Ctx) error {
tenant := c.Params("tenant")
if tenant == "" {
c.Status(fiber.StatusBadRequest)
return c.JSON(map[string]string{"message": "tenant is empty"})
}
err := createNewTenantDatabase(c.Context(), tenantConnectionMapper, tenant)
if err != nil {
return err
}
err = setupNewTenantTables(c.Context(), tenantConnectionMapper, tenant)
if err != nil {
return err
}
go startRandomNumberSeed(tenantConnectionMapper, tenant)
c.Status(fiber.StatusOK)
return c.JSON(map[string]string{"message": "tenant has been created"})
})
app.Get("/random/:tenant", func(c *fiber.Ctx) error {
tenant := c.Params("tenant")
if tenant == "" {
c.Status(fiber.StatusBadRequest)
return c.JSON(map[string]string{"message": "tenant is empty"})
}
rows, err := getRandomNumberTableByTenant(c.Context(), tenantConnectionMapper, tenant)
if err != nil {
return err
}
return c.JSON(rows)
})
app.Get("/peek-map", func(c *fiber.Ctx) error {
var tenants []string
for name := range tenantConnectionMapper.TenantConn {
tenants = append(tenants, name)
}
return c.JSON(tenants)
})
errChan := make(chan os.Signal, 1)
signal.Notify(errChan, os.Interrupt)
go func() {
<-errChan
app.Shutdown()
}()
app.Listen(":4000")
} else {
app := chi.NewRouter()
app.Use(middleware.Logger)
app.Get("/register/{tenant}", func(w http.ResponseWriter, r *http.Request) {
tenant := chi.URLParam(r, "tenant")
if tenant == "" {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"message": "tenant is empty"})
return
}
err := createNewTenantDatabase(r.Context(), tenantConnectionMapper, tenant)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
err = setupNewTenantTables(r.Context(), tenantConnectionMapper, tenant)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
go startRandomNumberSeed(tenantConnectionMapper, tenant)
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"message": "tenant has been created"})
})
app.Get("/random/{tenant}", func(w http.ResponseWriter, r *http.Request) {
tenant := chi.URLParam(r, "tenant")
if tenant == "" {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"message": "tenant is empty"})
return
}
rows, err := getRandomNumberTableByTenant(r.Context(), tenantConnectionMapper, tenant)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(rows)
})
app.Get("/peek-map", func(w http.ResponseWriter, r *http.Request) {
var tenants []string
for name := range tenantConnectionMapper.TenantConn {
tenants = append(tenants, name)
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(tenants)
})
errChan := make(chan os.Signal, 1)
signal.Notify(errChan, os.Interrupt)
server := &http.Server{
Handler: app,
Addr: ":4000",
}
go func() {
<-errChan
server.Shutdown(context.Background())
}()
log.Printf("server started on %s", server.Addr)
server.ListenAndServe()
}
// Close the connection after the app is shut down
tenantConnectionMapper.RLock()
for tenant, conn := range tenantConnectionMapper.TenantConn {
err := conn.Close()
if err != nil {
log.Printf("closing %s database: %s", tenant, err.Error())
}
}
tenantConnectionMapper.RUnlock()
}
func createNewTenantDatabase(ctx context.Context, tenantMapper *TenantConnectionMapper, tenantName string) error {
_, err := tenantMapper.MasterConn.ExecContext(ctx, `CREATE DATABASE `+tenantName)
if err != nil && strings.Contains(err.Error(), "already exists") {
return err
}
tenantConnection, err := sql.Open("postgres", tenantMapper.GlobalDatabaseConfig+" dbname= "+tenantName)
if err != nil {
return err
}
tenantMapper.Lock()
defer tenantMapper.Unlock()
tenantMapper.TenantConn[tenantName] = tenantConnection
return nil
}
func setupNewTenantTables(ctx context.Context, tenantMapper *TenantConnectionMapper, tenantName string) error {
// No need to try lock, we're not doing a write operation
tenantConnection, ok := tenantMapper.TenantConn[tenantName]
if !ok {
return fmt.Errorf("tenant is not set: %s", tenantName)
}
conn, err := tenantConnection.Conn(ctx)
if err != nil {
return err
}
defer func() {
err := conn.Close()
if err != nil {
log.Printf("[%s] closing connection: %s", tenantName, err.Error())
}
}()
_, err = conn.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS random_number (
timestamp TIMESTAMP DEFAULT NOW(),
value INTEGER NOT NULL
)`)
if err != nil {
return err
}
return nil
}
func startRandomNumberSeed(tenantMapper *TenantConnectionMapper, tenantName string) {
ctx := context.Background()
for {
tenantConnection, ok := tenantMapper.TenantConn[tenantName]
if !ok {
log.Printf("tenant not found: %s", tenantName)
time.Sleep(time.Second)
continue
}
conn, err := tenantConnection.Conn(ctx)
if err != nil {
log.Printf("[%s] acquiring connection from pool: %s", tenantName, err.Error())
time.Sleep(time.Second)
continue
}
_, err = conn.ExecContext(ctx, `INSERT INTO random_number
(
timestamp,
value
)
VALUES
($1, $2)
`,
time.Now(),
rand.Int31(),
)
if err != nil {
err := conn.Close()
if err != nil {
log.Printf("[%s] closing connection: %s", tenantName, err.Error())
}
log.Printf("[%s] inserting item: %s", tenantName, err.Error())
time.Sleep(time.Second)
continue
}
err = conn.Close()
if err != nil {
log.Printf("[%s] closing connection: %s", tenantName, err.Error())
}
time.Sleep(time.Second)
}
}
type RandomNumberTable struct {
Timestamp time.Time
RandomNumber int
}
func getRandomNumberTableByTenant(ctx context.Context, tenantMapper *TenantConnectionMapper, tenantName string) ([]RandomNumberTable, error) {
tenantConnection, ok := tenantMapper.TenantConn[tenantName]
if !ok {
return nil, fmt.Errorf("tenant is not set: %s", tenantName)
}
conn, err := tenantConnection.Conn(ctx)
if err != nil {
return nil, err
}
defer func() {
err := conn.Close()
if err != nil {
log.Printf("[%s] closing connection: %s", tenantName, err.Error())
}
}()
rows, err := conn.QueryContext(ctx, `SELECT timestamp, value FROM random_number LIMIT 5000`)
if err != nil {
return nil, err
}
defer func() {
err := rows.Close()
if err != nil {
log.Printf("[%s] closing rows: %s", tenantName, err.Error())
}
}()
var randomNumberTable []RandomNumberTable
for rows.Next() {
var entry RandomNumberTable
err := rows.Scan(&entry.Timestamp, &entry.RandomNumber)
if err != nil {
return nil, err
}
randomNumberTable = append(randomNumberTable, entry)
}
return randomNumberTable, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment