Last active
September 13, 2024 16:02
-
-
Save luisdelatorre012/fc4af9417ad0a7a5502c637116d0d066 to your computer and use it in GitHub Desktop.
json schema to sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import re | |
def convert_json_schema_to_sql(schema: dict[str, any]) -> list[str]: | |
tables = [] | |
foreign_keys = [] | |
indexes = [] | |
definitions = schema.get('definitions', {}) | |
# Handle top-level properties | |
for prop, details in schema.get('properties', {}).items(): | |
if details.get('type') == 'array': | |
process_array(schema['title'], prop, details, tables, foreign_keys, indexes, definitions) | |
elif details.get('type') == 'object': | |
process_object(f"{schema['title']}_{prop}", details, tables, foreign_keys, indexes, details.get('required', []), definitions) | |
else: | |
process_object(schema['title'], {prop: details}, tables, foreign_keys, indexes, schema.get('required', []), definitions) | |
return tables + foreign_keys + indexes | |
def process_object(table_name: str, obj: dict[str, any], tables: list[str], foreign_keys: list[str], indexes: list[str], required_fields: list[str], definitions: dict[str, any]) -> None: | |
columns = [] | |
for prop, details in obj.get('properties', {}).items(): | |
if '$ref' in details: | |
ref_name = details['$ref'].split('/')[-1] | |
if ref_name in definitions: | |
details = definitions[ref_name] | |
else: | |
raise ValueError(f"Reference {ref_name} not found in definitions") | |
if 'oneOf' in details or 'anyOf' in details or 'allOf' in details: | |
process_complex_type(table_name, prop, details, tables, foreign_keys, indexes, prop in required_fields, definitions) | |
elif details.get('type') == 'object': | |
process_object(f"{table_name}_{prop}", details, tables, foreign_keys, indexes, details.get('required', []), definitions) | |
columns.append(f" [{prop}_id] BIGINT" + (" NOT NULL" if prop in required_fields else "")) | |
fk = f"ALTER TABLE [dbo].[{table_name}] ADD CONSTRAINT [FK_{table_name}_{prop}] FOREIGN KEY ([{prop}_id]) REFERENCES [dbo].[{table_name}_{prop}]([id]);" | |
foreign_keys.append(fk) | |
elif details.get('type') == 'array': | |
process_array(table_name, prop, details, tables, foreign_keys, indexes, definitions) | |
else: | |
columns.append(create_column(prop, details, prop in required_fields)) | |
# Add index for id columns | |
if prop == 'id': | |
indexes.append(f"CREATE INDEX [IX_{table_name}_{prop}] ON [dbo].[{table_name}] ([{prop}]);") | |
if 'additionalProperties' in obj: | |
handle_additional_properties(table_name, obj['additionalProperties'], tables, foreign_keys, indexes) | |
if 'patternProperties' in obj: | |
handle_pattern_properties(table_name, obj['patternProperties'], tables, foreign_keys, indexes) | |
table_sql = f"CREATE TABLE [dbo].[{table_name}] (\n" | |
table_sql += " [id] BIGINT IDENTITY(1,1) PRIMARY KEY,\n" | |
table_sql += ',\n'.join(columns) | |
table_sql += "\n);" | |
tables.append(table_sql) | |
def process_array(parent_table: str, prop: str, details: dict[str, any], tables: list[str], foreign_keys: list[str], indexes: list[str], definitions: dict[str, any]) -> None: | |
table_name = f"{parent_table}_{prop}" | |
columns = [f" [{parent_table.lower()}_id] BIGINT NOT NULL"] | |
if isinstance(details['items'], dict) and details['items'].get('type') == 'object': | |
for item_prop, item_details in details['items'].get('properties', {}).items(): | |
columns.append(create_column(item_prop, item_details, item_prop in details['items'].get('required', []))) | |
elif isinstance(details['items'], list): # Tuple validation (arrays of arrays) | |
for i, item_schema in enumerate(details['items']): | |
columns.append(create_column(f"item_{i}", item_schema, True)) | |
elif details['items'].get('type') == 'array': # Array of arrays | |
nested_table_name = f"{table_name}_nested" | |
process_array(table_name, 'nested', details['items'], tables, foreign_keys, indexes, definitions) | |
columns.append(" [value_id] BIGINT NOT NULL") | |
fk = f"ALTER TABLE [dbo].[{table_name}] ADD CONSTRAINT [FK_{table_name}_nested] FOREIGN KEY ([value_id]) REFERENCES [dbo].[{nested_table_name}]([id]);" | |
foreign_keys.append(fk) | |
else: | |
columns.append(f" [value] {get_sql_type(details['items'])}") | |
if details.get('uniqueItems', False): | |
columns.append(f" CONSTRAINT [UQ_{table_name}_unique_item] UNIQUE ([{parent_table.lower()}_id], [value])") | |
table_sql = f"CREATE TABLE [dbo].[{table_name}] (\n" | |
table_sql += " [id] BIGINT IDENTITY(1,1) PRIMARY KEY,\n" | |
table_sql += ',\n'.join(columns) | |
table_sql += "\n);" | |
tables.append(table_sql) | |
fk = f"ALTER TABLE [dbo].[{table_name}] ADD CONSTRAINT [FK_{table_name}_{parent_table}] FOREIGN KEY ([{parent_table.lower()}_id]) REFERENCES [dbo].[{parent_table}]([id]);" | |
foreign_keys.append(fk) | |
def handle_additional_properties(table_name: str, additional_prop_schema: dict[str, any], tables: list[str], foreign_keys: list[str], indexes: list[str]) -> None: | |
prop_table_name = f"{table_name}_additional_props" | |
columns = [ | |
f" [{table_name.lower()}_id] BIGINT NOT NULL", | |
" [prop_name] NVARCHAR(255) NOT NULL", | |
f" [prop_value] {get_sql_type(additional_prop_schema)}", | |
f" CONSTRAINT [PK_{prop_table_name}] PRIMARY KEY ([{table_name.lower()}_id], [prop_name])" | |
] | |
table_sql = f"CREATE TABLE [dbo].[{prop_table_name}] (\n" | |
table_sql += ',\n'.join(columns) | |
table_sql += "\n);" | |
tables.append(table_sql) | |
fk = f"ALTER TABLE [dbo].[{prop_table_name}] ADD CONSTRAINT [FK_{prop_table_name}_{table_name}] FOREIGN KEY ([{table_name.lower()}_id]) REFERENCES [dbo].[{table_name}]([id]);" | |
foreign_keys.append(fk) | |
def handle_pattern_properties(table_name: str, pattern_properties: dict[str, any], tables: list[str], foreign_keys: list[str], indexes: list[str]) -> None: | |
for pattern, schema in pattern_properties.items(): | |
safe_pattern = re.sub(r'\W+', '_', pattern) | |
prop_table_name = f"{table_name}_pattern_{safe_pattern}" | |
columns = [ | |
f" [{table_name.lower()}_id] BIGINT NOT NULL", | |
" [prop_name] NVARCHAR(255) NOT NULL", | |
f" [prop_value] {get_sql_type(schema)}", | |
f" CONSTRAINT [PK_{prop_table_name}] PRIMARY KEY ([{table_name.lower()}_id], [prop_name])" | |
] | |
table_sql = f"CREATE TABLE [dbo].[{prop_table_name}] (\n" | |
table_sql += ',\n'.join(columns) | |
table_sql += "\n);" | |
tables.append(table_sql) | |
fk = f"ALTER TABLE [dbo].[{prop_table_name}] ADD CONSTRAINT [FK_{prop_table_name}_{table_name}] FOREIGN KEY ([{table_name.lower()}_id]) REFERENCES [dbo].[{table_name}]([id]);" | |
foreign_keys.append(fk) | |
check_constraint = f"ALTER TABLE [dbo].[{prop_table_name}] ADD CONSTRAINT [CHK_{prop_table_name}_pattern] CHECK ([prop_name] LIKE '{pattern}');" | |
tables.append(check_constraint) | |
def process_complex_type(table_name: str, prop: str, details: dict[str, any], tables: list[str], foreign_keys: list[str], indexes: list[str], is_required: bool, definitions: dict[str, any]) -> None: | |
subtypes = details.get('oneOf') or details.get('anyOf') or details.get('allOf') | |
for i, subtype in enumerate(subtypes): | |
subtype_name = f"{table_name}_{prop}_type{i}" | |
process_object(subtype_name, subtype, tables, foreign_keys, indexes, subtype.get('required', []), definitions) | |
fk = f"ALTER TABLE [dbo].[{table_name}] ADD CONSTRAINT [FK_{table_name}_{prop}_type{i}] FOREIGN KEY ([{prop}_id]) REFERENCES [dbo].[{subtype_name}]([id]);" | |
foreign_keys.append(fk) | |
def create_column(name: str, details: dict[str, any], is_required: bool) -> str: | |
sql_type = get_sql_type(details) | |
constraints = get_constraints(name, details, is_required) | |
return f" [{name}] {sql_type}{constraints}" | |
def get_sql_type(details: dict[str, any]) -> str: | |
type_mapping = { | |
'integer': 'BIGINT', | |
'string': 'NVARCHAR(255)', | |
'boolean': 'BIT', | |
'number': 'FLOAT' | |
} | |
if isinstance(details.get('type'), list): # Handle mixed types | |
return 'NVARCHAR(MAX)' | |
if details.get('type') == 'string': | |
if details.get('format') == 'date-time': | |
return 'DATETIME2' | |
elif details.get('contentMediaType') == 'application/json': | |
return 'NVARCHAR(MAX)' | |
elif 'maxLength' in details: | |
return f"NVARCHAR({details['maxLength']})" | |
elif 'enum' in details: | |
return f"NVARCHAR({max(len(str(v)) for v in details['enum'])})" | |
return type_mapping.get(details.get('type'), 'NVARCHAR(MAX)') | |
def get_constraints(name: str, details: dict[str, any], is_required: bool) -> str: | |
constraints = [] | |
if is_required and 'null' not in details.get('type', []): | |
constraints.append('NOT NULL') | |
if 'minimum' in details: | |
constraints.append(f"CHECK ([{name}] >= {details['minimum']})") | |
if 'maximum' in details: | |
constraints.append(f"CHECK ([{name}] <= {details['maximum']})") | |
if 'enum' in details: | |
enum_values = ', '.join(f"'{v}'" for v in details['enum']) | |
constraints.append(f"CHECK ([{name}] IN ({enum_values}))") | |
if 'const' in details: | |
constraints.append(f"CHECK ([{name}] = '{details['const']}')") | |
return ' ' + ' '.join(constraints) if constraints else '' | |
# Load the JSON schema | |
with open('ecommerce_schema.json', 'r') as f: | |
schema = json.load(f) | |
# Convert the schema to SQL DDL statements | |
sql_statements = convert_json_schema_to_sql(schema) | |
# Print the SQL statements | |
for statement in sql_statements: | |
print(statement) | |
print() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment