Created
March 17, 2026 14:40
-
-
Save orian/45ed082d7eb6aad5855aba0d431b79ba to your computer and use it in GitHub Desktop.
drop properties
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "context" | |
| "database/sql" | |
| "encoding/json" | |
| "flag" | |
| "fmt" | |
| "log" | |
| "os" | |
| "strings" | |
| "time" | |
| _ "github.com/ClickHouse/clickhouse-go/v2" | |
| ) | |
| type Config struct { | |
| TeamID int `json:"team_id"` | |
| StartTime string `json:"start_time"` | |
| EndTime string `json:"end_time"` | |
| Events []string `json:"events"` | |
| Properties []string `json:"properties"` | |
| } | |
| func main() { | |
| inputPath := flag.String("input", "", "path to JSON config file (required)") | |
| execute := flag.Bool("execute", false, "actually run the workflow (default: dry-run)") | |
| introspect := flag.Bool("introspect", false, "run count query only, then print remaining SQL") | |
| dsn := flag.String("dsn", "clickhouse://localhost:9440/default?secure=true", "ClickHouse DSN") | |
| flag.Parse() | |
| if *inputPath == "" { | |
| fmt.Fprintln(os.Stderr, "error: --input is required") | |
| flag.Usage() | |
| os.Exit(1) | |
| } | |
| data, err := os.ReadFile(*inputPath) | |
| if err != nil { | |
| log.Fatalf("failed to read input file: %v", err) | |
| } | |
| var cfg Config | |
| if err := json.Unmarshal(data, &cfg); err != nil { | |
| log.Fatalf("failed to parse config: %v", err) | |
| } | |
| if err := validateConfig(cfg); err != nil { | |
| log.Fatalf("invalid config: %v", err) | |
| } | |
| tempTable := fmt.Sprintf("team_%d_drop_props_%d", cfg.TeamID, time.Now().Unix()) | |
| w := &workflow{ | |
| cfg: cfg, | |
| tempTable: tempTable, | |
| dsn: *dsn, | |
| } | |
| if *execute || *introspect { | |
| db, err := sql.Open("clickhouse", *dsn) | |
| if err != nil { | |
| log.Fatalf("failed to connect to ClickHouse: %v", err) | |
| } | |
| defer db.Close() | |
| ctx := context.Background() | |
| if err := db.PingContext(ctx); err != nil { | |
| log.Fatalf("failed to ping ClickHouse: %v", err) | |
| } | |
| fmt.Println("connected to ClickHouse") | |
| w.db = db | |
| } | |
| if *execute { | |
| w.runAll() | |
| } else if *introspect { | |
| w.runIntrospect() | |
| } else { | |
| w.runDryRun() | |
| } | |
| } | |
| func validateConfig(cfg Config) error { | |
| if cfg.TeamID == 0 { | |
| return fmt.Errorf("team_id is required") | |
| } | |
| if cfg.StartTime == "" { | |
| return fmt.Errorf("start_time is required") | |
| } | |
| if cfg.EndTime == "" { | |
| return fmt.Errorf("end_time is required") | |
| } | |
| if len(cfg.Events) == 0 { | |
| return fmt.Errorf("at least one event is required") | |
| } | |
| if len(cfg.Properties) == 0 { | |
| return fmt.Errorf("at least one property is required") | |
| } | |
| return nil | |
| } | |
| type workflow struct { | |
| cfg Config | |
| tempTable string | |
| dsn string | |
| db *sql.DB | |
| } | |
| // SQL building helpers | |
| func (w *workflow) eventInClause() string { | |
| quoted := make([]string, len(w.cfg.Events)) | |
| for i, e := range w.cfg.Events { | |
| quoted[i] = fmt.Sprintf("'%s'", e) | |
| } | |
| return strings.Join(quoted, ", ") | |
| } | |
| func (w *workflow) propsArrayLiteral() string { | |
| quoted := make([]string, len(w.cfg.Properties)) | |
| for i, p := range w.cfg.Properties { | |
| quoted[i] = fmt.Sprintf("'%s'", p) | |
| } | |
| return "[" + strings.Join(quoted, ", ") + "]" | |
| } | |
| func (w *workflow) jsonHasFilter() string { | |
| clauses := make([]string, len(w.cfg.Properties)) | |
| for i, p := range w.cfg.Properties { | |
| clauses[i] = fmt.Sprintf("JSONHas(properties, '%s')", p) | |
| } | |
| return "(" + strings.Join(clauses, " OR ") + ")" | |
| } | |
| func (w *workflow) whereClause() string { | |
| return fmt.Sprintf( | |
| "team_id = %d\n AND event IN (%s)\n AND %s\n AND timestamp BETWEEN '%s' AND '%s'", | |
| w.cfg.TeamID, | |
| w.eventInClause(), | |
| w.jsonHasFilter(), | |
| w.cfg.StartTime, | |
| w.cfg.EndTime, | |
| ) | |
| } | |
| // Step SQL generators | |
| func (w *workflow) sqlCount() string { | |
| return fmt.Sprintf(`SELECT count(*) | |
| FROM sharded_events | |
| WHERE %s`, w.whereClause()) | |
| } | |
| func (w *workflow) sqlCreateTemp() string { | |
| return fmt.Sprintf(`CREATE TABLE %s AS sharded_events ENGINE = MergeTree()`, w.tempTable) | |
| } | |
| func (w *workflow) sqlCopyToTemp() string { | |
| return fmt.Sprintf(`INSERT INTO %s | |
| SELECT * | |
| FROM sharded_events | |
| WHERE %s`, w.tempTable, w.whereClause()) | |
| } | |
| func (w *workflow) sqlCountTemp() string { | |
| return fmt.Sprintf(`SELECT count() FROM %s`, w.tempTable) | |
| } | |
| func (w *workflow) sqlUpdateProps() string { | |
| return fmt.Sprintf(`ALTER TABLE %s | |
| UPDATE properties = JSONDropKeys(%s)(properties), inserted_at = now() | |
| WHERE team_id = %d`, | |
| w.tempTable, w.propsArrayLiteral(), w.cfg.TeamID) | |
| } | |
| func (w *workflow) sqlVerifyUpdate() string { | |
| return fmt.Sprintf(`SELECT count() | |
| FROM %s | |
| WHERE team_id = %d | |
| AND event IN (%s) | |
| AND timestamp BETWEEN '%s' AND '%s' | |
| AND hasAny(JSONExtractKeys(properties), %s)`, | |
| w.tempTable, w.cfg.TeamID, w.eventInClause(), | |
| w.cfg.StartTime, w.cfg.EndTime, w.propsArrayLiteral()) | |
| } | |
| func (w *workflow) sqlDeleteOriginals() string { | |
| return fmt.Sprintf(`DELETE FROM sharded_events | |
| WHERE %s | |
| SETTINGS max_execution_time = 10000, lightweight_deletes_sync = 1`, w.whereClause()) | |
| } | |
| func (w *workflow) sqlInsertBack() string { | |
| return fmt.Sprintf(`INSERT INTO sharded_events | |
| SELECT * | |
| FROM %s | |
| SETTINGS max_execution_time = 1800`, w.tempTable) | |
| } | |
| func (w *workflow) sqlVerifyFinal() string { | |
| return fmt.Sprintf(`SELECT count() | |
| FROM sharded_events | |
| WHERE team_id = %d | |
| AND event IN (%s) | |
| AND timestamp BETWEEN '%s' AND '%s' | |
| AND hasAny(JSONExtractKeys(properties), %s)`, | |
| w.cfg.TeamID, w.eventInClause(), | |
| w.cfg.StartTime, w.cfg.EndTime, w.propsArrayLiteral()) | |
| } | |
| func (w *workflow) sqlDropTemp() string { | |
| return fmt.Sprintf(`DROP TABLE %s`, w.tempTable) | |
| } | |
| // Execution helpers | |
| func printStep(n int, name, query string) { | |
| fmt.Printf("\n--- Step %d: %s ---\n", n, name) | |
| fmt.Printf("%s;\n", query) | |
| } | |
| func (w *workflow) execQuery(ctx context.Context, query string) error { | |
| _, err := w.db.ExecContext(ctx, query) | |
| return err | |
| } | |
| func (w *workflow) queryCount(ctx context.Context, query string) (uint64, error) { | |
| var count uint64 | |
| err := w.db.QueryRowContext(ctx, query).Scan(&count) | |
| return count, err | |
| } | |
| func (w *workflow) waitForMutations(ctx context.Context) error { | |
| fmt.Println("waiting for mutations to finish...") | |
| query := fmt.Sprintf( | |
| `SELECT count() FROM system.mutations WHERE table = '%s' AND is_done = 0`, | |
| w.tempTable, | |
| ) | |
| for { | |
| var pending uint64 | |
| if err := w.db.QueryRowContext(ctx, query).Scan(&pending); err != nil { | |
| return fmt.Errorf("polling mutations: %w", err) | |
| } | |
| if pending == 0 { | |
| fmt.Println("all mutations finished") | |
| return nil | |
| } | |
| fmt.Printf(" %d mutation(s) still running...\n", pending) | |
| time.Sleep(5 * time.Second) | |
| } | |
| } | |
| // Run modes | |
| func (w *workflow) runDryRun() { | |
| fmt.Println("=== DRY RUN (no ClickHouse connection) ===") | |
| fmt.Printf("temp table: %s\n", w.tempTable) | |
| printStep(1, "Count matching events", w.sqlCount()) | |
| printStep(2, "Create temp table", w.sqlCreateTemp()) | |
| printStep(3, "Copy events to temp table", w.sqlCopyToTemp()) | |
| printStep(4, "Count copied events", w.sqlCountTemp()) | |
| printStep(5, "Update properties (drop keys)", w.sqlUpdateProps()) | |
| printStep(6, "Verify update (expect 0)", w.sqlVerifyUpdate()) | |
| printStep(7, "Delete originals", w.sqlDeleteOriginals()) | |
| printStep(8, "Insert back from temp", w.sqlInsertBack()) | |
| printStep(9, "Verify final (expect 0)", w.sqlVerifyFinal()) | |
| printStep(10, "Drop temp table", w.sqlDropTemp()) | |
| fmt.Println("\n=== END DRY RUN ===") | |
| } | |
| func (w *workflow) runIntrospect() { | |
| ctx := context.Background() | |
| fmt.Println("=== INTROSPECT MODE ===") | |
| fmt.Printf("temp table: %s\n", w.tempTable) | |
| // Step 1: execute count | |
| q := w.sqlCount() | |
| printStep(1, "Count matching events", q) | |
| count, err := w.queryCount(ctx, q) | |
| if err != nil { | |
| log.Fatalf("count query failed: %v", err) | |
| } | |
| fmt.Printf(">>> result: %d matching events\n", count) | |
| // Steps 2-10: print only | |
| printStep(2, "Create temp table", w.sqlCreateTemp()) | |
| printStep(3, "Copy events to temp table", w.sqlCopyToTemp()) | |
| printStep(4, "Count copied events", w.sqlCountTemp()) | |
| printStep(5, "Update properties (drop keys)", w.sqlUpdateProps()) | |
| printStep(6, "Verify update (expect 0)", w.sqlVerifyUpdate()) | |
| printStep(7, "Delete originals", w.sqlDeleteOriginals()) | |
| printStep(8, "Insert back from temp", w.sqlInsertBack()) | |
| printStep(9, "Verify final (expect 0)", w.sqlVerifyFinal()) | |
| printStep(10, "Drop temp table", w.sqlDropTemp()) | |
| fmt.Println("\n=== END INTROSPECT ===") | |
| } | |
| func (w *workflow) runAll() { | |
| ctx := context.Background() | |
| fmt.Println("=== EXECUTE MODE ===") | |
| fmt.Printf("temp table: %s\n", w.tempTable) | |
| // Step 1: Count | |
| q := w.sqlCount() | |
| printStep(1, "Count matching events", q) | |
| expectedCount, err := w.queryCount(ctx, q) | |
| if err != nil { | |
| log.Fatalf("step 1 failed: %v", err) | |
| } | |
| fmt.Printf(">>> %d matching events\n", expectedCount) | |
| if expectedCount == 0 { | |
| fmt.Println("no matching events found, nothing to do") | |
| return | |
| } | |
| // Step 2: Create temp table | |
| q = w.sqlCreateTemp() | |
| printStep(2, "Create temp table", q) | |
| if err := w.execQuery(ctx, q); err != nil { | |
| log.Fatalf("step 2 failed: %v", err) | |
| } | |
| fmt.Println(">>> done") | |
| // From here on, clean up temp table on failure | |
| cleanup := func() { | |
| fmt.Printf("\ncleaning up: dropping temp table %s\n", w.tempTable) | |
| _ = w.execQuery(ctx, w.sqlDropTemp()) | |
| } | |
| // Step 3: Copy events | |
| q = w.sqlCopyToTemp() | |
| printStep(3, "Copy events to temp table", q) | |
| if err := w.execQuery(ctx, q); err != nil { | |
| cleanup() | |
| log.Fatalf("step 3 failed: %v", err) | |
| } | |
| fmt.Println(">>> done") | |
| // Step 4: Count copied | |
| q = w.sqlCountTemp() | |
| printStep(4, "Count copied events", q) | |
| copiedCount, err := w.queryCount(ctx, q) | |
| if err != nil { | |
| cleanup() | |
| log.Fatalf("step 4 failed: %v", err) | |
| } | |
| fmt.Printf(">>> %d rows in temp table\n", copiedCount) | |
| if copiedCount != expectedCount { | |
| cleanup() | |
| log.Fatalf("step 4 verification failed: expected %d rows but got %d", expectedCount, copiedCount) | |
| } | |
| fmt.Println(">>> counts match") | |
| // Step 5: Update properties | |
| q = w.sqlUpdateProps() | |
| printStep(5, "Update properties (drop keys)", q) | |
| if err := w.execQuery(ctx, q); err != nil { | |
| cleanup() | |
| log.Fatalf("step 5 failed: %v", err) | |
| } | |
| fmt.Println(">>> mutation submitted") | |
| // Wait for mutations to finish | |
| if err := w.waitForMutations(ctx); err != nil { | |
| cleanup() | |
| log.Fatalf("step 5 mutation wait failed: %v", err) | |
| } | |
| // Step 6: Verify update | |
| q = w.sqlVerifyUpdate() | |
| printStep(6, "Verify update (expect 0)", q) | |
| remaining, err := w.queryCount(ctx, q) | |
| if err != nil { | |
| cleanup() | |
| log.Fatalf("step 6 failed: %v", err) | |
| } | |
| fmt.Printf(">>> %d rows still have dropped keys\n", remaining) | |
| if remaining != 0 { | |
| cleanup() | |
| log.Fatalf("step 6 verification failed: %d rows still have properties that should have been dropped", remaining) | |
| } | |
| // Step 7: Delete originals | |
| q = w.sqlDeleteOriginals() | |
| printStep(7, "Delete originals", q) | |
| if err := w.execQuery(ctx, q); err != nil { | |
| cleanup() | |
| log.Fatalf("step 7 failed: %v", err) | |
| } | |
| fmt.Println(">>> done") | |
| // Step 8: Insert back | |
| q = w.sqlInsertBack() | |
| printStep(8, "Insert back from temp", q) | |
| if err := w.execQuery(ctx, q); err != nil { | |
| cleanup() | |
| log.Fatalf("step 8 failed: %v", err) | |
| } | |
| fmt.Println(">>> done") | |
| // Step 9: Verify final | |
| q = w.sqlVerifyFinal() | |
| printStep(9, "Verify final (expect 0)", q) | |
| finalCount, err := w.queryCount(ctx, q) | |
| if err != nil { | |
| cleanup() | |
| log.Fatalf("step 9 failed: %v", err) | |
| } | |
| fmt.Printf(">>> %d rows still have dropped keys\n", finalCount) | |
| if finalCount != 0 { | |
| cleanup() | |
| log.Fatalf("step 9 verification failed: %d rows in sharded_events still have properties that should have been dropped", finalCount) | |
| } | |
| // Step 10: Drop temp table | |
| q = w.sqlDropTemp() | |
| printStep(10, "Drop temp table", q) | |
| if err := w.execQuery(ctx, q); err != nil { | |
| log.Fatalf("step 10 failed: %v", err) | |
| } | |
| fmt.Println(">>> done") | |
| fmt.Println("\n=== COMPLETE ===") | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment