Skip to content

Instantly share code, notes, and snippets.

@luisdelatorre012
Last active September 13, 2024 16:02
Show Gist options
  • Save luisdelatorre012/fc4af9417ad0a7a5502c637116d0d066 to your computer and use it in GitHub Desktop.
Save luisdelatorre012/fc4af9417ad0a7a5502c637116d0d066 to your computer and use it in GitHub Desktop.
json schema to sql
import json
import re
def convert_json_schema_to_sql(schema: dict[str, any]) -> list[str]:
tables = []
foreign_keys = []
indexes = []
definitions = schema.get('definitions', {})
# Handle top-level properties
for prop, details in schema.get('properties', {}).items():
if details.get('type') == 'array':
process_array(schema['title'], prop, details, tables, foreign_keys, indexes, definitions)
elif details.get('type') == 'object':
process_object(f"{schema['title']}_{prop}", details, tables, foreign_keys, indexes, details.get('required', []), definitions)
else:
process_object(schema['title'], {prop: details}, tables, foreign_keys, indexes, schema.get('required', []), definitions)
return tables + foreign_keys + indexes
def process_object(table_name: str, obj: dict[str, any], tables: list[str], foreign_keys: list[str], indexes: list[str], required_fields: list[str], definitions: dict[str, any]) -> None:
columns = []
for prop, details in obj.get('properties', {}).items():
if '$ref' in details:
ref_name = details['$ref'].split('/')[-1]
if ref_name in definitions:
details = definitions[ref_name]
else:
raise ValueError(f"Reference {ref_name} not found in definitions")
if 'oneOf' in details or 'anyOf' in details or 'allOf' in details:
process_complex_type(table_name, prop, details, tables, foreign_keys, indexes, prop in required_fields, definitions)
elif details.get('type') == 'object':
process_object(f"{table_name}_{prop}", details, tables, foreign_keys, indexes, details.get('required', []), definitions)
columns.append(f" [{prop}_id] BIGINT" + (" NOT NULL" if prop in required_fields else ""))
fk = f"ALTER TABLE [dbo].[{table_name}] ADD CONSTRAINT [FK_{table_name}_{prop}] FOREIGN KEY ([{prop}_id]) REFERENCES [dbo].[{table_name}_{prop}]([id]);"
foreign_keys.append(fk)
elif details.get('type') == 'array':
process_array(table_name, prop, details, tables, foreign_keys, indexes, definitions)
else:
columns.append(create_column(prop, details, prop in required_fields))
# Add index for id columns
if prop == 'id':
indexes.append(f"CREATE INDEX [IX_{table_name}_{prop}] ON [dbo].[{table_name}] ([{prop}]);")
if 'additionalProperties' in obj:
handle_additional_properties(table_name, obj['additionalProperties'], tables, foreign_keys, indexes)
if 'patternProperties' in obj:
handle_pattern_properties(table_name, obj['patternProperties'], tables, foreign_keys, indexes)
table_sql = f"CREATE TABLE [dbo].[{table_name}] (\n"
table_sql += " [id] BIGINT IDENTITY(1,1) PRIMARY KEY,\n"
table_sql += ',\n'.join(columns)
table_sql += "\n);"
tables.append(table_sql)
def process_array(parent_table: str, prop: str, details: dict[str, any], tables: list[str], foreign_keys: list[str], indexes: list[str], definitions: dict[str, any]) -> None:
table_name = f"{parent_table}_{prop}"
columns = [f" [{parent_table.lower()}_id] BIGINT NOT NULL"]
if isinstance(details['items'], dict) and details['items'].get('type') == 'object':
for item_prop, item_details in details['items'].get('properties', {}).items():
columns.append(create_column(item_prop, item_details, item_prop in details['items'].get('required', [])))
elif isinstance(details['items'], list): # Tuple validation (arrays of arrays)
for i, item_schema in enumerate(details['items']):
columns.append(create_column(f"item_{i}", item_schema, True))
elif details['items'].get('type') == 'array': # Array of arrays
nested_table_name = f"{table_name}_nested"
process_array(table_name, 'nested', details['items'], tables, foreign_keys, indexes, definitions)
columns.append(" [value_id] BIGINT NOT NULL")
fk = f"ALTER TABLE [dbo].[{table_name}] ADD CONSTRAINT [FK_{table_name}_nested] FOREIGN KEY ([value_id]) REFERENCES [dbo].[{nested_table_name}]([id]);"
foreign_keys.append(fk)
else:
columns.append(f" [value] {get_sql_type(details['items'])}")
if details.get('uniqueItems', False):
columns.append(f" CONSTRAINT [UQ_{table_name}_unique_item] UNIQUE ([{parent_table.lower()}_id], [value])")
table_sql = f"CREATE TABLE [dbo].[{table_name}] (\n"
table_sql += " [id] BIGINT IDENTITY(1,1) PRIMARY KEY,\n"
table_sql += ',\n'.join(columns)
table_sql += "\n);"
tables.append(table_sql)
fk = f"ALTER TABLE [dbo].[{table_name}] ADD CONSTRAINT [FK_{table_name}_{parent_table}] FOREIGN KEY ([{parent_table.lower()}_id]) REFERENCES [dbo].[{parent_table}]([id]);"
foreign_keys.append(fk)
def handle_additional_properties(table_name: str, additional_prop_schema: dict[str, any], tables: list[str], foreign_keys: list[str], indexes: list[str]) -> None:
prop_table_name = f"{table_name}_additional_props"
columns = [
f" [{table_name.lower()}_id] BIGINT NOT NULL",
" [prop_name] NVARCHAR(255) NOT NULL",
f" [prop_value] {get_sql_type(additional_prop_schema)}",
f" CONSTRAINT [PK_{prop_table_name}] PRIMARY KEY ([{table_name.lower()}_id], [prop_name])"
]
table_sql = f"CREATE TABLE [dbo].[{prop_table_name}] (\n"
table_sql += ',\n'.join(columns)
table_sql += "\n);"
tables.append(table_sql)
fk = f"ALTER TABLE [dbo].[{prop_table_name}] ADD CONSTRAINT [FK_{prop_table_name}_{table_name}] FOREIGN KEY ([{table_name.lower()}_id]) REFERENCES [dbo].[{table_name}]([id]);"
foreign_keys.append(fk)
def handle_pattern_properties(table_name: str, pattern_properties: dict[str, any], tables: list[str], foreign_keys: list[str], indexes: list[str]) -> None:
for pattern, schema in pattern_properties.items():
safe_pattern = re.sub(r'\W+', '_', pattern)
prop_table_name = f"{table_name}_pattern_{safe_pattern}"
columns = [
f" [{table_name.lower()}_id] BIGINT NOT NULL",
" [prop_name] NVARCHAR(255) NOT NULL",
f" [prop_value] {get_sql_type(schema)}",
f" CONSTRAINT [PK_{prop_table_name}] PRIMARY KEY ([{table_name.lower()}_id], [prop_name])"
]
table_sql = f"CREATE TABLE [dbo].[{prop_table_name}] (\n"
table_sql += ',\n'.join(columns)
table_sql += "\n);"
tables.append(table_sql)
fk = f"ALTER TABLE [dbo].[{prop_table_name}] ADD CONSTRAINT [FK_{prop_table_name}_{table_name}] FOREIGN KEY ([{table_name.lower()}_id]) REFERENCES [dbo].[{table_name}]([id]);"
foreign_keys.append(fk)
check_constraint = f"ALTER TABLE [dbo].[{prop_table_name}] ADD CONSTRAINT [CHK_{prop_table_name}_pattern] CHECK ([prop_name] LIKE '{pattern}');"
tables.append(check_constraint)
def process_complex_type(table_name: str, prop: str, details: dict[str, any], tables: list[str], foreign_keys: list[str], indexes: list[str], is_required: bool, definitions: dict[str, any]) -> None:
subtypes = details.get('oneOf') or details.get('anyOf') or details.get('allOf')
for i, subtype in enumerate(subtypes):
subtype_name = f"{table_name}_{prop}_type{i}"
process_object(subtype_name, subtype, tables, foreign_keys, indexes, subtype.get('required', []), definitions)
fk = f"ALTER TABLE [dbo].[{table_name}] ADD CONSTRAINT [FK_{table_name}_{prop}_type{i}] FOREIGN KEY ([{prop}_id]) REFERENCES [dbo].[{subtype_name}]([id]);"
foreign_keys.append(fk)
def create_column(name: str, details: dict[str, any], is_required: bool) -> str:
sql_type = get_sql_type(details)
constraints = get_constraints(name, details, is_required)
return f" [{name}] {sql_type}{constraints}"
def get_sql_type(details: dict[str, any]) -> str:
type_mapping = {
'integer': 'BIGINT',
'string': 'NVARCHAR(255)',
'boolean': 'BIT',
'number': 'FLOAT'
}
if isinstance(details.get('type'), list): # Handle mixed types
return 'NVARCHAR(MAX)'
if details.get('type') == 'string':
if details.get('format') == 'date-time':
return 'DATETIME2'
elif details.get('contentMediaType') == 'application/json':
return 'NVARCHAR(MAX)'
elif 'maxLength' in details:
return f"NVARCHAR({details['maxLength']})"
elif 'enum' in details:
return f"NVARCHAR({max(len(str(v)) for v in details['enum'])})"
return type_mapping.get(details.get('type'), 'NVARCHAR(MAX)')
def get_constraints(name: str, details: dict[str, any], is_required: bool) -> str:
constraints = []
if is_required and 'null' not in details.get('type', []):
constraints.append('NOT NULL')
if 'minimum' in details:
constraints.append(f"CHECK ([{name}] >= {details['minimum']})")
if 'maximum' in details:
constraints.append(f"CHECK ([{name}] <= {details['maximum']})")
if 'enum' in details:
enum_values = ', '.join(f"'{v}'" for v in details['enum'])
constraints.append(f"CHECK ([{name}] IN ({enum_values}))")
if 'const' in details:
constraints.append(f"CHECK ([{name}] = '{details['const']}')")
return ' ' + ' '.join(constraints) if constraints else ''
# Load the JSON schema
with open('ecommerce_schema.json', 'r') as f:
schema = json.load(f)
# Convert the schema to SQL DDL statements
sql_statements = convert_json_schema_to_sql(schema)
# Print the SQL statements
for statement in sql_statements:
print(statement)
print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment