Skip to content

Instantly share code, notes, and snippets.

@bradmartin333
Last active March 19, 2026 11:48
Show Gist options
  • Select an option

  • Save bradmartin333/bf883d7bd9e4a204512ec4eea9d809f5 to your computer and use it in GitHub Desktop.

Select an option

Save bradmartin333/bf883d7bd9e4a204512ec4eea9d809f5 to your computer and use it in GitHub Desktop.
write json to Postgres db
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "psycopg2-binary",
# ]
# ///
import json
import sys
from pathlib import Path
from typing import Literal
import psycopg2
from psycopg2 import Error
from psycopg2 import sql
from psycopg2.extras import Json
from conf import DB_CONFIG
TABLE_NAME = "json_imports"
PgType = Literal["BOOLEAN", "BIGINT", "DOUBLE PRECISION", "TEXT", "JSONB"]
def load_json(path: Path):
with path.open("r", encoding="utf-8") as file:
return json.load(file)
def normalize_records(data) -> list[dict]:
if isinstance(data, dict):
return [data]
if isinstance(data, list) and all(isinstance(item, dict) for item in data):
return data
raise ValueError("JSON root must be an object or a list of objects")
def collect_columns(records: list[dict]) -> list[str]:
columns: list[str] = []
seen: set[str] = set()
for record in records:
for key in record:
if key not in seen:
seen.add(key)
columns.append(key)
return columns
def value_kind(value) -> str:
if value is None:
return "null"
if isinstance(value, bool):
return "bool"
if isinstance(value, int):
return "int"
if isinstance(value, float):
return "float"
if isinstance(value, (dict, list)):
return "json"
return "text"
def infer_column_type(records: list[dict], column: str) -> PgType:
kinds = {value_kind(record[column]) for record in records if column in record}
kinds.discard("null")
if not kinds:
return "TEXT"
if kinds == {"bool"}:
return "BOOLEAN"
if kinds == {"int"}:
return "BIGINT"
if kinds <= {"int", "float"}:
return "DOUBLE PRECISION"
if "json" in kinds:
return "JSONB"
return "TEXT"
def infer_schema(records: list[dict], columns: list[str]) -> dict[str, PgType]:
return {column: infer_column_type(records, column) for column in columns}
def to_db_value(value, pg_type: PgType):
if value is None:
return None
if pg_type == "JSONB":
if isinstance(value, (dict, list)):
return Json(value)
return Json(value)
if pg_type == "TEXT" and isinstance(value, (dict, list)):
return json.dumps(value)
return value
def ensure_table(cursor, schema: dict[str, PgType]) -> None:
cursor.execute(
sql.SQL(
"""
CREATE TABLE IF NOT EXISTS {} (
id BIGSERIAL PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
"""
).format(sql.Identifier(TABLE_NAME))
)
for column, pg_type in schema.items():
cursor.execute(
sql.SQL("ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}").format(
sql.Identifier(TABLE_NAME),
sql.Identifier(column),
sql.SQL(pg_type),
)
)
def insert_records(
cursor, columns: list[str], schema: dict[str, PgType], records: list[dict]
) -> int:
if not columns:
return 0
column_sql = sql.SQL(", ").join(sql.Identifier(column) for column in columns)
placeholder_sql = sql.SQL(", ").join(sql.Placeholder() for _ in columns)
query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
sql.Identifier(TABLE_NAME),
column_sql,
placeholder_sql,
)
inserted = 0
for record in records:
row = tuple(
to_db_value(record.get(column), schema[column]) for column in columns
)
cursor.execute(query, row)
inserted += 1
return inserted
def main() -> None:
if len(sys.argv) != 2:
print("Usage: uv run write_from_json.py <path-to-json-file>")
raise SystemExit(1)
json_path = Path(sys.argv[1])
if not json_path.exists() or not json_path.is_file():
print(f"JSON file not found: {json_path}")
raise SystemExit(1)
payload = load_json(json_path)
records = normalize_records(payload)
columns = collect_columns(records)
schema = infer_schema(records, columns)
connection = psycopg2.connect(
user=DB_CONFIG["user"],
password=DB_CONFIG["password"],
host=DB_CONFIG["host"],
port=DB_CONFIG["port"],
database=DB_CONFIG["database"],
)
try:
with connection:
with connection.cursor() as cursor:
ensure_table(cursor, schema)
inserted = insert_records(cursor, columns, schema, records)
print(f"Inserted {inserted} row(s) from {json_path} into '{TABLE_NAME}'.")
finally:
connection.close()
if __name__ == "__main__":
try:
main()
except json.JSONDecodeError as exc:
print(f"Invalid JSON: {exc}")
raise SystemExit(1)
except ValueError as exc:
print(exc)
raise SystemExit(1)
except (Exception, Error) as exc:
print(f"Error while writing JSON to PostgreSQL: {exc}")
raise SystemExit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment