Last active
March 19, 2026 11:48
-
-
Save bradmartin333/bf883d7bd9e4a204512ec4eea9d809f5 to your computer and use it in GitHub Desktop.
write json to Postgres db
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.13" | |
| # dependencies = [ | |
| # "psycopg2-binary", | |
| # ] | |
| # /// | |
| import json | |
| import sys | |
| from pathlib import Path | |
| from typing import Literal | |
| import psycopg2 | |
| from psycopg2 import Error | |
| from psycopg2 import sql | |
| from psycopg2.extras import Json | |
| from conf import DB_CONFIG | |
| TABLE_NAME = "json_imports" | |
| PgType = Literal["BOOLEAN", "BIGINT", "DOUBLE PRECISION", "TEXT", "JSONB"] | |
| def load_json(path: Path): | |
| with path.open("r", encoding="utf-8") as file: | |
| return json.load(file) | |
| def normalize_records(data) -> list[dict]: | |
| if isinstance(data, dict): | |
| return [data] | |
| if isinstance(data, list) and all(isinstance(item, dict) for item in data): | |
| return data | |
| raise ValueError("JSON root must be an object or a list of objects") | |
| def collect_columns(records: list[dict]) -> list[str]: | |
| columns: list[str] = [] | |
| seen: set[str] = set() | |
| for record in records: | |
| for key in record: | |
| if key not in seen: | |
| seen.add(key) | |
| columns.append(key) | |
| return columns | |
| def value_kind(value) -> str: | |
| if value is None: | |
| return "null" | |
| if isinstance(value, bool): | |
| return "bool" | |
| if isinstance(value, int): | |
| return "int" | |
| if isinstance(value, float): | |
| return "float" | |
| if isinstance(value, (dict, list)): | |
| return "json" | |
| return "text" | |
| def infer_column_type(records: list[dict], column: str) -> PgType: | |
| kinds = {value_kind(record[column]) for record in records if column in record} | |
| kinds.discard("null") | |
| if not kinds: | |
| return "TEXT" | |
| if kinds == {"bool"}: | |
| return "BOOLEAN" | |
| if kinds == {"int"}: | |
| return "BIGINT" | |
| if kinds <= {"int", "float"}: | |
| return "DOUBLE PRECISION" | |
| if "json" in kinds: | |
| return "JSONB" | |
| return "TEXT" | |
| def infer_schema(records: list[dict], columns: list[str]) -> dict[str, PgType]: | |
| return {column: infer_column_type(records, column) for column in columns} | |
| def to_db_value(value, pg_type: PgType): | |
| if value is None: | |
| return None | |
| if pg_type == "JSONB": | |
| if isinstance(value, (dict, list)): | |
| return Json(value) | |
| return Json(value) | |
| if pg_type == "TEXT" and isinstance(value, (dict, list)): | |
| return json.dumps(value) | |
| return value | |
| def ensure_table(cursor, schema: dict[str, PgType]) -> None: | |
| cursor.execute( | |
| sql.SQL( | |
| """ | |
| CREATE TABLE IF NOT EXISTS {} ( | |
| id BIGSERIAL PRIMARY KEY, | |
| created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() | |
| ); | |
| """ | |
| ).format(sql.Identifier(TABLE_NAME)) | |
| ) | |
| for column, pg_type in schema.items(): | |
| cursor.execute( | |
| sql.SQL("ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}").format( | |
| sql.Identifier(TABLE_NAME), | |
| sql.Identifier(column), | |
| sql.SQL(pg_type), | |
| ) | |
| ) | |
| def insert_records( | |
| cursor, columns: list[str], schema: dict[str, PgType], records: list[dict] | |
| ) -> int: | |
| if not columns: | |
| return 0 | |
| column_sql = sql.SQL(", ").join(sql.Identifier(column) for column in columns) | |
| placeholder_sql = sql.SQL(", ").join(sql.Placeholder() for _ in columns) | |
| query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format( | |
| sql.Identifier(TABLE_NAME), | |
| column_sql, | |
| placeholder_sql, | |
| ) | |
| inserted = 0 | |
| for record in records: | |
| row = tuple( | |
| to_db_value(record.get(column), schema[column]) for column in columns | |
| ) | |
| cursor.execute(query, row) | |
| inserted += 1 | |
| return inserted | |
| def main() -> None: | |
| if len(sys.argv) != 2: | |
| print("Usage: uv run write_from_json.py <path-to-json-file>") | |
| raise SystemExit(1) | |
| json_path = Path(sys.argv[1]) | |
| if not json_path.exists() or not json_path.is_file(): | |
| print(f"JSON file not found: {json_path}") | |
| raise SystemExit(1) | |
| payload = load_json(json_path) | |
| records = normalize_records(payload) | |
| columns = collect_columns(records) | |
| schema = infer_schema(records, columns) | |
| connection = psycopg2.connect( | |
| user=DB_CONFIG["user"], | |
| password=DB_CONFIG["password"], | |
| host=DB_CONFIG["host"], | |
| port=DB_CONFIG["port"], | |
| database=DB_CONFIG["database"], | |
| ) | |
| try: | |
| with connection: | |
| with connection.cursor() as cursor: | |
| ensure_table(cursor, schema) | |
| inserted = insert_records(cursor, columns, schema, records) | |
| print(f"Inserted {inserted} row(s) from {json_path} into '{TABLE_NAME}'.") | |
| finally: | |
| connection.close() | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except json.JSONDecodeError as exc: | |
| print(f"Invalid JSON: {exc}") | |
| raise SystemExit(1) | |
| except ValueError as exc: | |
| print(exc) | |
| raise SystemExit(1) | |
| except (Exception, Error) as exc: | |
| print(f"Error while writing JSON to PostgreSQL: {exc}") | |
| raise SystemExit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment