Skip to content

Instantly share code, notes, and snippets.

@luisdelatorre012
Last active September 30, 2024 11:37
Show Gist options
  • Save luisdelatorre012/9c2fbe0a983f9fa0e1917990252ab49a to your computer and use it in GitHub Desktop.
Save luisdelatorre012/9c2fbe0a983f9fa0e1917990252ab49a to your computer and use it in GitHub Desktop.
Recursively walk dict
import json
import pyodbc
import re
from datetime import date, datetime
from typing import Any, Dict, Optional, List
def load_json(file_path: str) -> Any:
with open(file_path, 'r') as file:
return json.load(file)
def is_simple_type(value: Any) -> bool:
return isinstance(value, (str, int, float, bool, type(None), date, datetime))
def sanitize_identifier(name: str) -> str:
# Replace non-word characters with underscores and ensure it doesn't start with a number
name = re.sub(r'\W+', '_', name)
if re.match(r'^\d', name):
name = '_' + name
return name
def connect_to_db() -> pyodbc.Connection:
try:
conn = pyodbc.connect(
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=your_server;'
'DATABASE=your_database;'
'UID=your_username;'
'PWD=your_password'
)
return conn
except pyodbc.Error as e:
print(f"Error connecting to database: {e}")
raise
def insert_data(
cursor: pyodbc.Cursor,
table_name: str,
data: Dict[str, Any]
) -> int:
if not data:
# No data to insert
return None
sanitized_table_name = sanitize_identifier(table_name)
columns = ', '.join(f'[{col}]' for col in data.keys())
placeholders = ', '.join('?' for _ in data)
query = f"INSERT INTO [{sanitized_table_name}] ({columns}) OUTPUT INSERTED.ID VALUES ({placeholders});"
values = list(data.values())
try:
cursor.execute(query, values)
inserted_id = cursor.fetchone()[0]
return inserted_id # Actual ID from the database
except pyodbc.Error as e:
print(f"Error inserting data into {sanitized_table_name}: {e}")
raise
def insert_batch_data(
cursor: pyodbc.Cursor,
table_name: str,
data_list: List[Dict[str, Any]]
) -> List[int]:
if not data_list:
return []
sanitized_table_name = sanitize_identifier(table_name)
columns = data_list[0].keys()
columns_clause = ', '.join(f'[{col}]' for col in columns)
placeholders = ', '.join('?' for _ in columns)
query = f"INSERT INTO [{sanitized_table_name}] ({columns_clause}) OUTPUT INSERTED.ID VALUES ({placeholders})"
values = [list(data.values()) for data in data_list]
try:
cursor.executemany(query, values)
# Fetch all inserted IDs
inserted_ids = [row[0] for row in cursor.fetchall()]
return inserted_ids
except pyodbc.Error as e:
print(f"Error inserting batch data into {sanitized_table_name}: {e}")
raise
def process_json(
cursor: pyodbc.Cursor,
json_data: Any,
table_name: str,
parent_table_name: Optional[str] = None,
parent_id: Optional[int] = None
) -> None:
# Collect data for batch insertion
batch_data = []
child_nodes = []
if is_simple_type(json_data):
# Wrap the simple type in a dictionary with a default key
data = {'value': json_data}
if parent_table_name and parent_id is not None:
parent_column_name = f"{sanitize_identifier(parent_table_name)}_id"
data[parent_column_name] = parent_id
batch_data.append(data)
elif isinstance(json_data, dict):
# Extract simple types for current node
data = {k: v for k, v in json_data.items() if is_simple_type(v)}
if parent_table_name and parent_id is not None:
parent_column_name = f"{sanitize_identifier(parent_table_name)}_id"
data[parent_column_name] = parent_id
# Insert current node to get its ID
row_id = insert_data(cursor, table_name, data)
# Collect child nodes for processing
for key, value in json_data.items():
if isinstance(value, (dict, list)):
child_table_name = f"{table_name}_{key}"
child_nodes.append((value, child_table_name, table_name, row_id))
else:
# Simple types are already handled
pass
# Process child nodes
for child_data, child_table, parent_table, p_id in child_nodes:
process_json(cursor, child_data, child_table, parent_table, p_id)
elif isinstance(json_data, list):
# Check if all items are simple types
if all(is_simple_type(item) for item in json_data):
for item in json_data:
data = {'value': item}
if parent_table_name and parent_id is not None:
parent_column_name = f"{sanitize_identifier(parent_table_name)}_id"
data[parent_column_name] = parent_id
batch_data.append(data)
# Batch insert all simple items in the list
insert_batch_data(cursor, table_name, batch_data)
else:
# List contains complex types
for item in json_data:
process_json(cursor, item, table_name, parent_table_name, parent_id)
else:
# Non-simple, non-dict, non-list type
pass # Ignore or handle accordingly
def main() -> None:
json_file_path = 'example_message.json'
json_data = load_json(json_file_path)
conn = connect_to_db()
cursor = conn.cursor()
cursor.fast_executemany = True # Enable fast executemany
try:
conn.autocommit = False # Start transaction
process_json(cursor, json_data, 'root_table')
conn.commit()
print("Data inserted successfully")
except Exception as e:
conn.rollback()
print(f"An error occurred: {e}")
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment