Last active
September 30, 2024 11:37
-
-
Save luisdelatorre012/9c2fbe0a983f9fa0e1917990252ab49a to your computer and use it in GitHub Desktop.
Recursively walk dict
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import pyodbc | |
import re | |
from datetime import date, datetime | |
from typing import Any, Dict, Optional, List | |
def load_json(file_path: str) -> Any: | |
with open(file_path, 'r') as file: | |
return json.load(file) | |
def is_simple_type(value: Any) -> bool: | |
return isinstance(value, (str, int, float, bool, type(None), date, datetime)) | |
def sanitize_identifier(name: str) -> str: | |
# Replace non-word characters with underscores and ensure it doesn't start with a number | |
name = re.sub(r'\W+', '_', name) | |
if re.match(r'^\d', name): | |
name = '_' + name | |
return name | |
def connect_to_db() -> pyodbc.Connection: | |
try: | |
conn = pyodbc.connect( | |
'DRIVER={ODBC Driver 17 for SQL Server};' | |
'SERVER=your_server;' | |
'DATABASE=your_database;' | |
'UID=your_username;' | |
'PWD=your_password' | |
) | |
return conn | |
except pyodbc.Error as e: | |
print(f"Error connecting to database: {e}") | |
raise | |
def insert_data( | |
cursor: pyodbc.Cursor, | |
table_name: str, | |
data: Dict[str, Any] | |
) -> int: | |
if not data: | |
# No data to insert | |
return None | |
sanitized_table_name = sanitize_identifier(table_name) | |
columns = ', '.join(f'[{col}]' for col in data.keys()) | |
placeholders = ', '.join('?' for _ in data) | |
query = f"INSERT INTO [{sanitized_table_name}] ({columns}) OUTPUT INSERTED.ID VALUES ({placeholders});" | |
values = list(data.values()) | |
try: | |
cursor.execute(query, values) | |
inserted_id = cursor.fetchone()[0] | |
return inserted_id # Actual ID from the database | |
except pyodbc.Error as e: | |
print(f"Error inserting data into {sanitized_table_name}: {e}") | |
raise | |
def insert_batch_data( | |
cursor: pyodbc.Cursor, | |
table_name: str, | |
data_list: List[Dict[str, Any]] | |
) -> List[int]: | |
if not data_list: | |
return [] | |
sanitized_table_name = sanitize_identifier(table_name) | |
columns = data_list[0].keys() | |
columns_clause = ', '.join(f'[{col}]' for col in columns) | |
placeholders = ', '.join('?' for _ in columns) | |
query = f"INSERT INTO [{sanitized_table_name}] ({columns_clause}) OUTPUT INSERTED.ID VALUES ({placeholders})" | |
values = [list(data.values()) for data in data_list] | |
try: | |
cursor.executemany(query, values) | |
# Fetch all inserted IDs | |
inserted_ids = [row[0] for row in cursor.fetchall()] | |
return inserted_ids | |
except pyodbc.Error as e: | |
print(f"Error inserting batch data into {sanitized_table_name}: {e}") | |
raise | |
def process_json( | |
cursor: pyodbc.Cursor, | |
json_data: Any, | |
table_name: str, | |
parent_table_name: Optional[str] = None, | |
parent_id: Optional[int] = None | |
) -> None: | |
# Collect data for batch insertion | |
batch_data = [] | |
child_nodes = [] | |
if is_simple_type(json_data): | |
# Wrap the simple type in a dictionary with a default key | |
data = {'value': json_data} | |
if parent_table_name and parent_id is not None: | |
parent_column_name = f"{sanitize_identifier(parent_table_name)}_id" | |
data[parent_column_name] = parent_id | |
batch_data.append(data) | |
elif isinstance(json_data, dict): | |
# Extract simple types for current node | |
data = {k: v for k, v in json_data.items() if is_simple_type(v)} | |
if parent_table_name and parent_id is not None: | |
parent_column_name = f"{sanitize_identifier(parent_table_name)}_id" | |
data[parent_column_name] = parent_id | |
# Insert current node to get its ID | |
row_id = insert_data(cursor, table_name, data) | |
# Collect child nodes for processing | |
for key, value in json_data.items(): | |
if isinstance(value, (dict, list)): | |
child_table_name = f"{table_name}_{key}" | |
child_nodes.append((value, child_table_name, table_name, row_id)) | |
else: | |
# Simple types are already handled | |
pass | |
# Process child nodes | |
for child_data, child_table, parent_table, p_id in child_nodes: | |
process_json(cursor, child_data, child_table, parent_table, p_id) | |
elif isinstance(json_data, list): | |
# Check if all items are simple types | |
if all(is_simple_type(item) for item in json_data): | |
for item in json_data: | |
data = {'value': item} | |
if parent_table_name and parent_id is not None: | |
parent_column_name = f"{sanitize_identifier(parent_table_name)}_id" | |
data[parent_column_name] = parent_id | |
batch_data.append(data) | |
# Batch insert all simple items in the list | |
insert_batch_data(cursor, table_name, batch_data) | |
else: | |
# List contains complex types | |
for item in json_data: | |
process_json(cursor, item, table_name, parent_table_name, parent_id) | |
else: | |
# Non-simple, non-dict, non-list type | |
pass # Ignore or handle accordingly | |
def main() -> None: | |
json_file_path = 'example_message.json' | |
json_data = load_json(json_file_path) | |
conn = connect_to_db() | |
cursor = conn.cursor() | |
cursor.fast_executemany = True # Enable fast executemany | |
try: | |
conn.autocommit = False # Start transaction | |
process_json(cursor, json_data, 'root_table') | |
conn.commit() | |
print("Data inserted successfully") | |
except Exception as e: | |
conn.rollback() | |
print(f"An error occurred: {e}") | |
finally: | |
cursor.close() | |
conn.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment