Skip to content

Instantly share code, notes, and snippets.

@orian
Created March 17, 2026 14:40
Show Gist options
  • Select an option

  • Save orian/45ed082d7eb6aad5855aba0d431b79ba to your computer and use it in GitHub Desktop.

Select an option

Save orian/45ed082d7eb6aad5855aba0d431b79ba to your computer and use it in GitHub Desktop.
drop properties
package main
import (
"context"
"database/sql"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"strings"
"time"
_ "github.com/ClickHouse/clickhouse-go/v2"
)
type Config struct {
TeamID int `json:"team_id"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
Events []string `json:"events"`
Properties []string `json:"properties"`
}
func main() {
inputPath := flag.String("input", "", "path to JSON config file (required)")
execute := flag.Bool("execute", false, "actually run the workflow (default: dry-run)")
introspect := flag.Bool("introspect", false, "run count query only, then print remaining SQL")
dsn := flag.String("dsn", "clickhouse://localhost:9440/default?secure=true", "ClickHouse DSN")
flag.Parse()
if *inputPath == "" {
fmt.Fprintln(os.Stderr, "error: --input is required")
flag.Usage()
os.Exit(1)
}
data, err := os.ReadFile(*inputPath)
if err != nil {
log.Fatalf("failed to read input file: %v", err)
}
var cfg Config
if err := json.Unmarshal(data, &cfg); err != nil {
log.Fatalf("failed to parse config: %v", err)
}
if err := validateConfig(cfg); err != nil {
log.Fatalf("invalid config: %v", err)
}
tempTable := fmt.Sprintf("team_%d_drop_props_%d", cfg.TeamID, time.Now().Unix())
w := &workflow{
cfg: cfg,
tempTable: tempTable,
dsn: *dsn,
}
if *execute || *introspect {
db, err := sql.Open("clickhouse", *dsn)
if err != nil {
log.Fatalf("failed to connect to ClickHouse: %v", err)
}
defer db.Close()
ctx := context.Background()
if err := db.PingContext(ctx); err != nil {
log.Fatalf("failed to ping ClickHouse: %v", err)
}
fmt.Println("connected to ClickHouse")
w.db = db
}
if *execute {
w.runAll()
} else if *introspect {
w.runIntrospect()
} else {
w.runDryRun()
}
}
func validateConfig(cfg Config) error {
if cfg.TeamID == 0 {
return fmt.Errorf("team_id is required")
}
if cfg.StartTime == "" {
return fmt.Errorf("start_time is required")
}
if cfg.EndTime == "" {
return fmt.Errorf("end_time is required")
}
if len(cfg.Events) == 0 {
return fmt.Errorf("at least one event is required")
}
if len(cfg.Properties) == 0 {
return fmt.Errorf("at least one property is required")
}
return nil
}
type workflow struct {
cfg Config
tempTable string
dsn string
db *sql.DB
}
// SQL building helpers
func (w *workflow) eventInClause() string {
quoted := make([]string, len(w.cfg.Events))
for i, e := range w.cfg.Events {
quoted[i] = fmt.Sprintf("'%s'", e)
}
return strings.Join(quoted, ", ")
}
func (w *workflow) propsArrayLiteral() string {
quoted := make([]string, len(w.cfg.Properties))
for i, p := range w.cfg.Properties {
quoted[i] = fmt.Sprintf("'%s'", p)
}
return "[" + strings.Join(quoted, ", ") + "]"
}
func (w *workflow) jsonHasFilter() string {
clauses := make([]string, len(w.cfg.Properties))
for i, p := range w.cfg.Properties {
clauses[i] = fmt.Sprintf("JSONHas(properties, '%s')", p)
}
return "(" + strings.Join(clauses, " OR ") + ")"
}
func (w *workflow) whereClause() string {
return fmt.Sprintf(
"team_id = %d\n AND event IN (%s)\n AND %s\n AND timestamp BETWEEN '%s' AND '%s'",
w.cfg.TeamID,
w.eventInClause(),
w.jsonHasFilter(),
w.cfg.StartTime,
w.cfg.EndTime,
)
}
// Step SQL generators
func (w *workflow) sqlCount() string {
return fmt.Sprintf(`SELECT count(*)
FROM sharded_events
WHERE %s`, w.whereClause())
}
func (w *workflow) sqlCreateTemp() string {
return fmt.Sprintf(`CREATE TABLE %s AS sharded_events ENGINE = MergeTree()`, w.tempTable)
}
func (w *workflow) sqlCopyToTemp() string {
return fmt.Sprintf(`INSERT INTO %s
SELECT *
FROM sharded_events
WHERE %s`, w.tempTable, w.whereClause())
}
func (w *workflow) sqlCountTemp() string {
return fmt.Sprintf(`SELECT count() FROM %s`, w.tempTable)
}
func (w *workflow) sqlUpdateProps() string {
return fmt.Sprintf(`ALTER TABLE %s
UPDATE properties = JSONDropKeys(%s)(properties), inserted_at = now()
WHERE team_id = %d`,
w.tempTable, w.propsArrayLiteral(), w.cfg.TeamID)
}
func (w *workflow) sqlVerifyUpdate() string {
return fmt.Sprintf(`SELECT count()
FROM %s
WHERE team_id = %d
AND event IN (%s)
AND timestamp BETWEEN '%s' AND '%s'
AND hasAny(JSONExtractKeys(properties), %s)`,
w.tempTable, w.cfg.TeamID, w.eventInClause(),
w.cfg.StartTime, w.cfg.EndTime, w.propsArrayLiteral())
}
func (w *workflow) sqlDeleteOriginals() string {
return fmt.Sprintf(`DELETE FROM sharded_events
WHERE %s
SETTINGS max_execution_time = 10000, lightweight_deletes_sync = 1`, w.whereClause())
}
func (w *workflow) sqlInsertBack() string {
return fmt.Sprintf(`INSERT INTO sharded_events
SELECT *
FROM %s
SETTINGS max_execution_time = 1800`, w.tempTable)
}
func (w *workflow) sqlVerifyFinal() string {
return fmt.Sprintf(`SELECT count()
FROM sharded_events
WHERE team_id = %d
AND event IN (%s)
AND timestamp BETWEEN '%s' AND '%s'
AND hasAny(JSONExtractKeys(properties), %s)`,
w.cfg.TeamID, w.eventInClause(),
w.cfg.StartTime, w.cfg.EndTime, w.propsArrayLiteral())
}
func (w *workflow) sqlDropTemp() string {
return fmt.Sprintf(`DROP TABLE %s`, w.tempTable)
}
// Execution helpers
func printStep(n int, name, query string) {
fmt.Printf("\n--- Step %d: %s ---\n", n, name)
fmt.Printf("%s;\n", query)
}
func (w *workflow) execQuery(ctx context.Context, query string) error {
_, err := w.db.ExecContext(ctx, query)
return err
}
func (w *workflow) queryCount(ctx context.Context, query string) (uint64, error) {
var count uint64
err := w.db.QueryRowContext(ctx, query).Scan(&count)
return count, err
}
func (w *workflow) waitForMutations(ctx context.Context) error {
fmt.Println("waiting for mutations to finish...")
query := fmt.Sprintf(
`SELECT count() FROM system.mutations WHERE table = '%s' AND is_done = 0`,
w.tempTable,
)
for {
var pending uint64
if err := w.db.QueryRowContext(ctx, query).Scan(&pending); err != nil {
return fmt.Errorf("polling mutations: %w", err)
}
if pending == 0 {
fmt.Println("all mutations finished")
return nil
}
fmt.Printf(" %d mutation(s) still running...\n", pending)
time.Sleep(5 * time.Second)
}
}
// Run modes
func (w *workflow) runDryRun() {
fmt.Println("=== DRY RUN (no ClickHouse connection) ===")
fmt.Printf("temp table: %s\n", w.tempTable)
printStep(1, "Count matching events", w.sqlCount())
printStep(2, "Create temp table", w.sqlCreateTemp())
printStep(3, "Copy events to temp table", w.sqlCopyToTemp())
printStep(4, "Count copied events", w.sqlCountTemp())
printStep(5, "Update properties (drop keys)", w.sqlUpdateProps())
printStep(6, "Verify update (expect 0)", w.sqlVerifyUpdate())
printStep(7, "Delete originals", w.sqlDeleteOriginals())
printStep(8, "Insert back from temp", w.sqlInsertBack())
printStep(9, "Verify final (expect 0)", w.sqlVerifyFinal())
printStep(10, "Drop temp table", w.sqlDropTemp())
fmt.Println("\n=== END DRY RUN ===")
}
func (w *workflow) runIntrospect() {
ctx := context.Background()
fmt.Println("=== INTROSPECT MODE ===")
fmt.Printf("temp table: %s\n", w.tempTable)
// Step 1: execute count
q := w.sqlCount()
printStep(1, "Count matching events", q)
count, err := w.queryCount(ctx, q)
if err != nil {
log.Fatalf("count query failed: %v", err)
}
fmt.Printf(">>> result: %d matching events\n", count)
// Steps 2-10: print only
printStep(2, "Create temp table", w.sqlCreateTemp())
printStep(3, "Copy events to temp table", w.sqlCopyToTemp())
printStep(4, "Count copied events", w.sqlCountTemp())
printStep(5, "Update properties (drop keys)", w.sqlUpdateProps())
printStep(6, "Verify update (expect 0)", w.sqlVerifyUpdate())
printStep(7, "Delete originals", w.sqlDeleteOriginals())
printStep(8, "Insert back from temp", w.sqlInsertBack())
printStep(9, "Verify final (expect 0)", w.sqlVerifyFinal())
printStep(10, "Drop temp table", w.sqlDropTemp())
fmt.Println("\n=== END INTROSPECT ===")
}
func (w *workflow) runAll() {
ctx := context.Background()
fmt.Println("=== EXECUTE MODE ===")
fmt.Printf("temp table: %s\n", w.tempTable)
// Step 1: Count
q := w.sqlCount()
printStep(1, "Count matching events", q)
expectedCount, err := w.queryCount(ctx, q)
if err != nil {
log.Fatalf("step 1 failed: %v", err)
}
fmt.Printf(">>> %d matching events\n", expectedCount)
if expectedCount == 0 {
fmt.Println("no matching events found, nothing to do")
return
}
// Step 2: Create temp table
q = w.sqlCreateTemp()
printStep(2, "Create temp table", q)
if err := w.execQuery(ctx, q); err != nil {
log.Fatalf("step 2 failed: %v", err)
}
fmt.Println(">>> done")
// From here on, clean up temp table on failure
cleanup := func() {
fmt.Printf("\ncleaning up: dropping temp table %s\n", w.tempTable)
_ = w.execQuery(ctx, w.sqlDropTemp())
}
// Step 3: Copy events
q = w.sqlCopyToTemp()
printStep(3, "Copy events to temp table", q)
if err := w.execQuery(ctx, q); err != nil {
cleanup()
log.Fatalf("step 3 failed: %v", err)
}
fmt.Println(">>> done")
// Step 4: Count copied
q = w.sqlCountTemp()
printStep(4, "Count copied events", q)
copiedCount, err := w.queryCount(ctx, q)
if err != nil {
cleanup()
log.Fatalf("step 4 failed: %v", err)
}
fmt.Printf(">>> %d rows in temp table\n", copiedCount)
if copiedCount != expectedCount {
cleanup()
log.Fatalf("step 4 verification failed: expected %d rows but got %d", expectedCount, copiedCount)
}
fmt.Println(">>> counts match")
// Step 5: Update properties
q = w.sqlUpdateProps()
printStep(5, "Update properties (drop keys)", q)
if err := w.execQuery(ctx, q); err != nil {
cleanup()
log.Fatalf("step 5 failed: %v", err)
}
fmt.Println(">>> mutation submitted")
// Wait for mutations to finish
if err := w.waitForMutations(ctx); err != nil {
cleanup()
log.Fatalf("step 5 mutation wait failed: %v", err)
}
// Step 6: Verify update
q = w.sqlVerifyUpdate()
printStep(6, "Verify update (expect 0)", q)
remaining, err := w.queryCount(ctx, q)
if err != nil {
cleanup()
log.Fatalf("step 6 failed: %v", err)
}
fmt.Printf(">>> %d rows still have dropped keys\n", remaining)
if remaining != 0 {
cleanup()
log.Fatalf("step 6 verification failed: %d rows still have properties that should have been dropped", remaining)
}
// Step 7: Delete originals
q = w.sqlDeleteOriginals()
printStep(7, "Delete originals", q)
if err := w.execQuery(ctx, q); err != nil {
cleanup()
log.Fatalf("step 7 failed: %v", err)
}
fmt.Println(">>> done")
// Step 8: Insert back
q = w.sqlInsertBack()
printStep(8, "Insert back from temp", q)
if err := w.execQuery(ctx, q); err != nil {
cleanup()
log.Fatalf("step 8 failed: %v", err)
}
fmt.Println(">>> done")
// Step 9: Verify final
q = w.sqlVerifyFinal()
printStep(9, "Verify final (expect 0)", q)
finalCount, err := w.queryCount(ctx, q)
if err != nil {
cleanup()
log.Fatalf("step 9 failed: %v", err)
}
fmt.Printf(">>> %d rows still have dropped keys\n", finalCount)
if finalCount != 0 {
cleanup()
log.Fatalf("step 9 verification failed: %d rows in sharded_events still have properties that should have been dropped", finalCount)
}
// Step 10: Drop temp table
q = w.sqlDropTemp()
printStep(10, "Drop temp table", q)
if err := w.execQuery(ctx, q); err != nil {
log.Fatalf("step 10 failed: %v", err)
}
fmt.Println(">>> done")
fmt.Println("\n=== COMPLETE ===")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment