Skip to content

Instantly share code, notes, and snippets.

@haileyok
Last active November 10, 2025 19:51
Show Gist options
  • Select an option

  • Save haileyok/e55058055e69a767d09837412e92c822 to your computer and use it in GitHub Desktop.

Select an option

Save haileyok/e55058055e69a767d09837412e92c822 to your computer and use it in GitHub Desktop.
// All calling code should properly handle locking
func (edi *EventDataInserter) hasNewColumns(data map[string]any) bool {
for key := range data {
if edi.schemaCache[strings.ToLower(key)] {
continue
}
return true
}
return false
}
func (edi *EventDataInserter) refreshSchemaCache(ctx context.Context) error {
metadata, err := edi.tableRef.Metadata(ctx)
if err != nil {
return fmt.Errorf("failed to get table metadata: %w", err)
}
for _, f := range metadata.Schema {
edi.schemaCache[strings.ToLower(f.Name)] = true
}
return nil
}
func (edi *EventDataInserter) addColumns(ctx context.Context, data map[string]any) error {
// get the latest metadata
metadata, err := edi.tableRef.Metadata(ctx)
if err != nil {
return fmt.Errorf("failed to get table metadata: %w", err)
}
currCols := make(map[string]bool)
for _, f := range metadata.Schema {
currCols[strings.ToLower(f.Name)] = true
}
newSchema := append(bigquery.Schema{}, metadata.Schema...)
var newCols []string
for colName, val := range data {
if currCols[strings.ToLower(colName)] {
continue
}
fieldType, repeated := inferBigQueryType(val)
newSchema = append(newSchema, &bigquery.FieldSchema{
Name: colName,
Type: fieldType,
Repeated: repeated,
})
newCols = append(newCols, colName)
}
if len(newCols) == 0 {
return nil
}
update := bigquery.TableMetadataToUpdate{
Schema: newSchema,
}
// NOTE: even after this completes, i've noticed there are still some errors for ~10-15 seconds when trying to
// perform inserts. important that whatever inserting logic you use properly retry failed inserts!
if _, err := edi.tableRef.Update(ctx, update, ""); err != nil {
return err
}
for _, col := range newCols {
edi.schemaCache[strings.ToLower(col)] = true
}
edi.logger.Info("added new columns to data table", "columns", newCols)
return nil
}
func inferBigQueryType(value any) (bigquery.FieldType, bool) {
switch v := value.(type) {
case bool:
return bigquery.BooleanFieldType, false
case float32, float64:
return bigquery.FloatFieldType, false
case int, int32, int64:
return bigquery.IntegerFieldType, false
case string:
return bigquery.StringFieldType, false
case time.Time:
return bigquery.TimestampFieldType, false
case []string:
return bigquery.StringFieldType, true
case []bool:
return bigquery.BooleanFieldType, true
case []int, []int32, []int64:
return bigquery.IntegerFieldType, true
case []float32, []float64:
return bigquery.FloatFieldType, true
case []any:
if len(v) > 0 {
elemType, _ := inferBigQueryType(v[0])
return elemType, true
}
return bigquery.StringFieldType, true
default:
return bigquery.StringFieldType, false
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment