Skip to content

Instantly share code, notes, and snippets.

@cododel
Created April 18, 2025 14:51
Show Gist options
  • Save cododel/bb2fc2f06ff438ed38847f397f179484 to your computer and use it in GitHub Desktop.
Save cododel/bb2fc2f06ff438ed38847f397f179484 to your computer and use it in GitHub Desktop.
Convert "Sql INSERT INTO" to "Directus-Sync seed" file
import re
import json
import argparse
from typing import List, Dict, Any, Tuple, Optional
from pathlib import Path
def normalize_sql_content(sql_content: str) -> str:
"""
Normalize SQL content by:
1. Removing comments (--, /* */)
2. Converting to lowercase (except within quotes)
3. Converting multiline statements to single line per statement
4. Standardizing whitespace
"""
# Remove block comments first /* ... */
sql_content = re.sub(r'/\*.*?\*/', '', sql_content, flags=re.DOTALL)
# Remove single line comments -- ...
sql_content = re.sub(r'--.*$', '', sql_content, flags=re.MULTILINE)
# Collapse whitespace and handle semicolons carefully
lines = []
for line in sql_content.splitlines():
stripped_line = line.strip()
if stripped_line:
lines.append(stripped_line)
sql_content = ' '.join(lines)
# Standardize whitespace around commas and parentheses
sql_content = re.sub(r'\s*,\s*', ',', sql_content)
sql_content = re.sub(r'\s*\(\s*', '(', sql_content)
sql_content = re.sub(r'\s*\)\s*', ')', sql_content)
sql_content = re.sub(r'\s+', ' ', sql_content).strip()
# Split statements by semicolon, preserving semicolons potentially within quotes (basic handling)
statements = []
current_statement = []
in_quotes = False
for char in sql_content:
current_statement.append(char)
if char == "'":
in_quotes = not in_quotes
elif char == ';' and not in_quotes:
statements.append("".join(current_statement).strip())
current_statement = []
if current_statement: # Add the last statement if it doesn't end with ;
statements.append("".join(current_statement).strip())
# Basic lowercase conversion, trying to preserve quoted identifiers/values
# This is complex to do perfectly without a full parser
normalized_statements = []
for stmt in statements:
# Attempt to lowercase keywords like INSERT INTO, VALUES
stmt = re.sub(r'\binsert\s+into\b', 'insert into', stmt, flags=re.IGNORECASE)
stmt = re.sub(r'\bvalues\b', 'values', stmt, flags=re.IGNORECASE)
normalized_statements.append(stmt)
return normalized_statements # Return list of statements
def parse_value(value_str: str) -> Any:
"""Parse a single value string, handling quotes, nulls, and basic casts."""
value_str = value_str.strip()
# Handle casts like 'uuid'::uuid or 'timestamp'::timestamp without time zone
cast_match = re.match(r"^'(.*?)'(?:::[\w\s]+)?$", value_str)
if cast_match:
return cast_match.group(1) # Return the string within quotes
# Handle simple quoted strings
if value_str.startswith("'") and value_str.endswith("'"):
return value_str[1:-1] # Remove quotes
# Handle NULL
if value_str.lower() == 'null':
return None
# Handle numbers (int/float)
try:
return int(value_str)
except ValueError:
try:
return float(value_str)
except ValueError:
# Return as string if not null, quoted, or numeric
return value_str
def parse_insert_statement(statement: str) -> Optional[Tuple[str, List[str], List[List[Any]]]]:
"""Parses a single INSERT INTO statement."""
if not statement.lower().startswith('insert into'):
return None
# Extract table name (quoted or unquoted)
table_match = re.search(r'insert\s+into\s+("?)([\w\.]+)\1', statement, re.IGNORECASE)
if not table_match:
print(f"Warning: Could not parse table name from: {statement[:100]}...")
return None
table_name = table_match.group(2)
# Extract columns (quoted or unquoted)
columns_match = re.search(r'\((.*?)\)\s*values', statement, re.IGNORECASE | re.DOTALL)
if not columns_match:
print(f"Warning: Could not parse columns from: {statement[:100]}...")
return None
columns_str = columns_match.group(1)
# Find quoted or unquoted column names
columns = [col.strip().strip('"') for col in columns_str.split(',')]
# Extract value tuples
# Find the start of the first VALUES tuple after the columns part
values_start_index = columns_match.end()
values_part = statement[values_start_index:].strip()
# Regex to find content within parentheses, attempting to handle nested ones simply
# This is still fragile for complex nested structures or escaped quotes
value_tuples_str = re.findall(r'\((.*?)\)', values_part)
parsed_values = []
for tuple_str in value_tuples_str:
# Split values within a tuple, basic handling for commas in quotes
values_in_tuple = []
current_val = []
in_quotes = False
for char in tuple_str:
if char == "'":
in_quotes = not in_quotes
current_val.append(char)
elif char == ',' and not in_quotes:
values_in_tuple.append("".join(current_val).strip())
current_val = []
else:
current_val.append(char)
values_in_tuple.append("".join(current_val).strip()) # Add the last value
if len(values_in_tuple) == len(columns):
parsed_values.append([parse_value(v) for v in values_in_tuple])
else:
print(f"Warning: Mismatch between column count ({len(columns)}) and value count ({len(values_in_tuple)}) in tuple: {tuple_str}")
if not columns or not parsed_values:
print(f"Warning: Could not parse columns or values correctly for table {table_name}")
return None
return table_name, columns, parsed_values
def create_seed_format(table_name: str, columns: List[str], values_list: List[List[Any]]) -> Dict[str, Any]:
"""Creates the Directus seed format for a single table."""
items = []
# Try to find a primary key column ('id', 'uuid', or the first column)
pk_column = None
if 'id' in columns:
pk_column = 'id'
elif 'uuid' in columns:
pk_column = 'uuid'
else:
pk_column = columns[0] # Fallback to first column
pk_index = columns.index(pk_column)
for row_values in values_list:
item = dict(zip(columns, row_values))
# Generate _sync_id
pk_value = row_values[pk_index]
item['_sync_id'] = f"{table_name}-{pk_value}" if pk_value is not None else f"{table_name}-item-{len(items)}"
items.append(item)
return {
"collection": table_name,
"meta": {
"insert_order": 1, # Default order, might need adjustment for dependencies
"create": True,
"update": True,
"delete": True,
"preserve_ids": True, # Assumes the PK is included and should be preserved
"ignore_on_update": []
},
"data": items
}
def process_sql_file(input_file: Path, output_file: Path) -> None:
"""Process SQL file and write seed data to output file."""
with open(input_file, 'r', encoding='utf-8') as f:
sql_content = f.read()
statements = normalize_sql_content(sql_content)
all_seeds = []
processed_tables = set()
for statement in statements:
parsed_data = parse_insert_statement(statement)
if parsed_data:
table_name, columns, values_list = parsed_data
print(f"Processing table: {table_name} with {len(values_list)} rows.")
seed_data = create_seed_format(table_name, columns, values_list)
all_seeds.append(seed_data)
processed_tables.add(table_name)
if not all_seeds:
print("Error: No valid INSERT statements found or parsed.")
return
# Adjust insert_order if multiple tables were processed (basic dependency handling)
# This is a heuristic; complex dependencies need manual ordering.
if len(all_seeds) > 1:
print(f"Processed multiple tables: {processed_tables}. Consider adjusting 'insert_order' manually if there are dependencies.")
# Simple: Give translations tables higher order if detected
for i, seed in enumerate(all_seeds):
if 'translations' in seed['collection'].lower():
seed['meta']['insert_order'] = 2 # Example: Higher order for translations
else:
seed['meta']['insert_order'] = 1
# Write seeds to output file
output_data = all_seeds if len(all_seeds) > 1 else all_seeds[0]
try:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"Successfully converted {input_file} to {output_file}")
except Exception as e:
print(f"Error writing JSON to {output_file}: {e}")
def main():
parser = argparse.ArgumentParser(description='Convert SQL INSERT statements to Directus seed format.')
parser.add_argument('-i', '--input', type=Path, required=True,
help='Input SQL file path')
parser.add_argument('-o', '--output', type=Path, required=True,
help='Output JSON file path')
args = parser.parse_args()
if not args.input.exists():
print(f"Error: Input file '{args.input}' does not exist")
return
args.output.parent.mkdir(parents=True, exist_ok=True)
try:
process_sql_file(args.input, args.output)
except Exception as e:
print(f"An unexpected error occurred during processing: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment