Last active
November 10, 2025 19:51
-
-
Save haileyok/e55058055e69a767d09837412e92c822 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // All calling code should properly handle locking | |
| func (edi *EventDataInserter) hasNewColumns(data map[string]any) bool { | |
| for key := range data { | |
| if edi.schemaCache[strings.ToLower(key)] { | |
| continue | |
| } | |
| return true | |
| } | |
| return false | |
| } | |
| func (edi *EventDataInserter) refreshSchemaCache(ctx context.Context) error { | |
| metadata, err := edi.tableRef.Metadata(ctx) | |
| if err != nil { | |
| return fmt.Errorf("failed to get table metadata: %w", err) | |
| } | |
| for _, f := range metadata.Schema { | |
| edi.schemaCache[strings.ToLower(f.Name)] = true | |
| } | |
| return nil | |
| } | |
| func (edi *EventDataInserter) addColumns(ctx context.Context, data map[string]any) error { | |
| // get the latest metadata | |
| metadata, err := edi.tableRef.Metadata(ctx) | |
| if err != nil { | |
| return fmt.Errorf("failed to get table metadata: %w", err) | |
| } | |
| currCols := make(map[string]bool) | |
| for _, f := range metadata.Schema { | |
| currCols[strings.ToLower(f.Name)] = true | |
| } | |
| newSchema := append(bigquery.Schema{}, metadata.Schema...) | |
| var newCols []string | |
| for colName, val := range data { | |
| if currCols[strings.ToLower(colName)] { | |
| continue | |
| } | |
| fieldType, repeated := inferBigQueryType(val) | |
| newSchema = append(newSchema, &bigquery.FieldSchema{ | |
| Name: colName, | |
| Type: fieldType, | |
| Repeated: repeated, | |
| }) | |
| newCols = append(newCols, colName) | |
| } | |
| if len(newCols) == 0 { | |
| return nil | |
| } | |
| update := bigquery.TableMetadataToUpdate{ | |
| Schema: newSchema, | |
| } | |
| // NOTE: even after this completes, i've noticed there are still some errors for ~10-15 seconds when trying to | |
| // perform inserts. important that whatever inserting logic you use properly retry failed inserts! | |
| if _, err := edi.tableRef.Update(ctx, update, ""); err != nil { | |
| return err | |
| } | |
| for _, col := range newCols { | |
| edi.schemaCache[strings.ToLower(col)] = true | |
| } | |
| edi.logger.Info("added new columns to data table", "columns", newCols) | |
| return nil | |
| } | |
| func inferBigQueryType(value any) (bigquery.FieldType, bool) { | |
| switch v := value.(type) { | |
| case bool: | |
| return bigquery.BooleanFieldType, false | |
| case float32, float64: | |
| return bigquery.FloatFieldType, false | |
| case int, int32, int64: | |
| return bigquery.IntegerFieldType, false | |
| case string: | |
| return bigquery.StringFieldType, false | |
| case time.Time: | |
| return bigquery.TimestampFieldType, false | |
| case []string: | |
| return bigquery.StringFieldType, true | |
| case []bool: | |
| return bigquery.BooleanFieldType, true | |
| case []int, []int32, []int64: | |
| return bigquery.IntegerFieldType, true | |
| case []float32, []float64: | |
| return bigquery.FloatFieldType, true | |
| case []any: | |
| if len(v) > 0 { | |
| elemType, _ := inferBigQueryType(v[0]) | |
| return elemType, true | |
| } | |
| return bigquery.StringFieldType, true | |
| default: | |
| return bigquery.StringFieldType, false | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment