Created
April 18, 2025 14:51
-
-
Save cododel/bb2fc2f06ff438ed38847f397f179484 to your computer and use it in GitHub Desktop.
Convert "Sql INSERT INTO" to "Directus-Sync seed" file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import json | |
import argparse | |
from typing import List, Dict, Any, Tuple, Optional | |
from pathlib import Path | |
def normalize_sql_content(sql_content: str) -> str: | |
""" | |
Normalize SQL content by: | |
1. Removing comments (--, /* */) | |
2. Converting to lowercase (except within quotes) | |
3. Converting multiline statements to single line per statement | |
4. Standardizing whitespace | |
""" | |
# Remove block comments first /* ... */ | |
sql_content = re.sub(r'/\*.*?\*/', '', sql_content, flags=re.DOTALL) | |
# Remove single line comments -- ... | |
sql_content = re.sub(r'--.*$', '', sql_content, flags=re.MULTILINE) | |
# Collapse whitespace and handle semicolons carefully | |
lines = [] | |
for line in sql_content.splitlines(): | |
stripped_line = line.strip() | |
if stripped_line: | |
lines.append(stripped_line) | |
sql_content = ' '.join(lines) | |
# Standardize whitespace around commas and parentheses | |
sql_content = re.sub(r'\s*,\s*', ',', sql_content) | |
sql_content = re.sub(r'\s*\(\s*', '(', sql_content) | |
sql_content = re.sub(r'\s*\)\s*', ')', sql_content) | |
sql_content = re.sub(r'\s+', ' ', sql_content).strip() | |
# Split statements by semicolon, preserving semicolons potentially within quotes (basic handling) | |
statements = [] | |
current_statement = [] | |
in_quotes = False | |
for char in sql_content: | |
current_statement.append(char) | |
if char == "'": | |
in_quotes = not in_quotes | |
elif char == ';' and not in_quotes: | |
statements.append("".join(current_statement).strip()) | |
current_statement = [] | |
if current_statement: # Add the last statement if it doesn't end with ; | |
statements.append("".join(current_statement).strip()) | |
# Basic lowercase conversion, trying to preserve quoted identifiers/values | |
# This is complex to do perfectly without a full parser | |
normalized_statements = [] | |
for stmt in statements: | |
# Attempt to lowercase keywords like INSERT INTO, VALUES | |
stmt = re.sub(r'\binsert\s+into\b', 'insert into', stmt, flags=re.IGNORECASE) | |
stmt = re.sub(r'\bvalues\b', 'values', stmt, flags=re.IGNORECASE) | |
normalized_statements.append(stmt) | |
return normalized_statements # Return list of statements | |
def parse_value(value_str: str) -> Any: | |
"""Parse a single value string, handling quotes, nulls, and basic casts.""" | |
value_str = value_str.strip() | |
# Handle casts like 'uuid'::uuid or 'timestamp'::timestamp without time zone | |
cast_match = re.match(r"^'(.*?)'(?:::[\w\s]+)?$", value_str) | |
if cast_match: | |
return cast_match.group(1) # Return the string within quotes | |
# Handle simple quoted strings | |
if value_str.startswith("'") and value_str.endswith("'"): | |
return value_str[1:-1] # Remove quotes | |
# Handle NULL | |
if value_str.lower() == 'null': | |
return None | |
# Handle numbers (int/float) | |
try: | |
return int(value_str) | |
except ValueError: | |
try: | |
return float(value_str) | |
except ValueError: | |
# Return as string if not null, quoted, or numeric | |
return value_str | |
def parse_insert_statement(statement: str) -> Optional[Tuple[str, List[str], List[List[Any]]]]: | |
"""Parses a single INSERT INTO statement.""" | |
if not statement.lower().startswith('insert into'): | |
return None | |
# Extract table name (quoted or unquoted) | |
table_match = re.search(r'insert\s+into\s+("?)([\w\.]+)\1', statement, re.IGNORECASE) | |
if not table_match: | |
print(f"Warning: Could not parse table name from: {statement[:100]}...") | |
return None | |
table_name = table_match.group(2) | |
# Extract columns (quoted or unquoted) | |
columns_match = re.search(r'\((.*?)\)\s*values', statement, re.IGNORECASE | re.DOTALL) | |
if not columns_match: | |
print(f"Warning: Could not parse columns from: {statement[:100]}...") | |
return None | |
columns_str = columns_match.group(1) | |
# Find quoted or unquoted column names | |
columns = [col.strip().strip('"') for col in columns_str.split(',')] | |
# Extract value tuples | |
# Find the start of the first VALUES tuple after the columns part | |
values_start_index = columns_match.end() | |
values_part = statement[values_start_index:].strip() | |
# Regex to find content within parentheses, attempting to handle nested ones simply | |
# This is still fragile for complex nested structures or escaped quotes | |
value_tuples_str = re.findall(r'\((.*?)\)', values_part) | |
parsed_values = [] | |
for tuple_str in value_tuples_str: | |
# Split values within a tuple, basic handling for commas in quotes | |
values_in_tuple = [] | |
current_val = [] | |
in_quotes = False | |
for char in tuple_str: | |
if char == "'": | |
in_quotes = not in_quotes | |
current_val.append(char) | |
elif char == ',' and not in_quotes: | |
values_in_tuple.append("".join(current_val).strip()) | |
current_val = [] | |
else: | |
current_val.append(char) | |
values_in_tuple.append("".join(current_val).strip()) # Add the last value | |
if len(values_in_tuple) == len(columns): | |
parsed_values.append([parse_value(v) for v in values_in_tuple]) | |
else: | |
print(f"Warning: Mismatch between column count ({len(columns)}) and value count ({len(values_in_tuple)}) in tuple: {tuple_str}") | |
if not columns or not parsed_values: | |
print(f"Warning: Could not parse columns or values correctly for table {table_name}") | |
return None | |
return table_name, columns, parsed_values | |
def create_seed_format(table_name: str, columns: List[str], values_list: List[List[Any]]) -> Dict[str, Any]: | |
"""Creates the Directus seed format for a single table.""" | |
items = [] | |
# Try to find a primary key column ('id', 'uuid', or the first column) | |
pk_column = None | |
if 'id' in columns: | |
pk_column = 'id' | |
elif 'uuid' in columns: | |
pk_column = 'uuid' | |
else: | |
pk_column = columns[0] # Fallback to first column | |
pk_index = columns.index(pk_column) | |
for row_values in values_list: | |
item = dict(zip(columns, row_values)) | |
# Generate _sync_id | |
pk_value = row_values[pk_index] | |
item['_sync_id'] = f"{table_name}-{pk_value}" if pk_value is not None else f"{table_name}-item-{len(items)}" | |
items.append(item) | |
return { | |
"collection": table_name, | |
"meta": { | |
"insert_order": 1, # Default order, might need adjustment for dependencies | |
"create": True, | |
"update": True, | |
"delete": True, | |
"preserve_ids": True, # Assumes the PK is included and should be preserved | |
"ignore_on_update": [] | |
}, | |
"data": items | |
} | |
def process_sql_file(input_file: Path, output_file: Path) -> None: | |
"""Process SQL file and write seed data to output file.""" | |
with open(input_file, 'r', encoding='utf-8') as f: | |
sql_content = f.read() | |
statements = normalize_sql_content(sql_content) | |
all_seeds = [] | |
processed_tables = set() | |
for statement in statements: | |
parsed_data = parse_insert_statement(statement) | |
if parsed_data: | |
table_name, columns, values_list = parsed_data | |
print(f"Processing table: {table_name} with {len(values_list)} rows.") | |
seed_data = create_seed_format(table_name, columns, values_list) | |
all_seeds.append(seed_data) | |
processed_tables.add(table_name) | |
if not all_seeds: | |
print("Error: No valid INSERT statements found or parsed.") | |
return | |
# Adjust insert_order if multiple tables were processed (basic dependency handling) | |
# This is a heuristic; complex dependencies need manual ordering. | |
if len(all_seeds) > 1: | |
print(f"Processed multiple tables: {processed_tables}. Consider adjusting 'insert_order' manually if there are dependencies.") | |
# Simple: Give translations tables higher order if detected | |
for i, seed in enumerate(all_seeds): | |
if 'translations' in seed['collection'].lower(): | |
seed['meta']['insert_order'] = 2 # Example: Higher order for translations | |
else: | |
seed['meta']['insert_order'] = 1 | |
# Write seeds to output file | |
output_data = all_seeds if len(all_seeds) > 1 else all_seeds[0] | |
try: | |
with open(output_file, 'w', encoding='utf-8') as f: | |
json.dump(output_data, f, indent=2, ensure_ascii=False) | |
print(f"Successfully converted {input_file} to {output_file}") | |
except Exception as e: | |
print(f"Error writing JSON to {output_file}: {e}") | |
def main(): | |
parser = argparse.ArgumentParser(description='Convert SQL INSERT statements to Directus seed format.') | |
parser.add_argument('-i', '--input', type=Path, required=True, | |
help='Input SQL file path') | |
parser.add_argument('-o', '--output', type=Path, required=True, | |
help='Output JSON file path') | |
args = parser.parse_args() | |
if not args.input.exists(): | |
print(f"Error: Input file '{args.input}' does not exist") | |
return | |
args.output.parent.mkdir(parents=True, exist_ok=True) | |
try: | |
process_sql_file(args.input, args.output) | |
except Exception as e: | |
print(f"An unexpected error occurred during processing: {e}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment