Skip to content

Instantly share code, notes, and snippets.

@skwashd
Last active January 13, 2025 16:30
Show Gist options
  • Save skwashd/a26912eb7329f7578ffe33d2976ba1d6 to your computer and use it in GitHub Desktop.
Save skwashd/a26912eb7329f7578ffe33d2976ba1d6 to your computer and use it in GitHub Desktop.
Import Data from JSON File to Aurora PostgreSQL

Import Data from JSON File to Aurora PostgreSQL

This script loads data from a PostreSQL JSON dump file into an Aurora PostgreSQL database using the RDS Data API.

Update the values in the COLUMN_TYPES dictionary to match your schema. Map your column data types to the supported field types. You can cast a string to a custom type by appending :type to the field type.

Run Script

To run the script you need boto3 installed. Then run the script like so:

python import.py <rds-arn> <db> <table> <secret-arn> /path/to/dump.json

The values should be self explanatory, but here is a quick run down:

  • rds-arn The ARN of your Aurora DB
  • db The name of your database
  • table The table you want to import the dump into
  • secret-arn The ARN of the secrets manager secret that contains the creds for your database
  • /path/to/dump.json The location of the JSON file you want to import.

Here is a real example:

python import.py 'arn:aws:rds:us-east-1:012345678910:cluster:my-cluster' db my_table 'arn:aws:secretsmanager:us-east-1:012345678910:secret:rds!cluster-db62491b-79c9-4479-8da8-676df3098cc2'  /path/to/dump.json

Create Dump

To generate the dump run the following query:

COPY (SELECT ROW_TO_JSON(t)
  FROM (SELECT * FROM my_table) t)
  TO '/path/to/dump.json';

Note: Update the path to where you want the file written.

"""Load data from JSON dump into Aurora PostgreSQL DB using the data API."""
import json
import sys
from collections.abc import Generator
import boto3
rds = boto3.client("rds-data")
# Update this dict to define your columns
# Refer to https://docs.aws.amazon.com/rdsdataservice/latest/APIReference/API_Field.html for the types
# Use nativeType:custom_type to cast a value from a string to a custom type
COLUMN_TYPES = {
"text_val": "stringValue",
"int_val": "longValue",
"float_val": "doubleValue",
"custom_type": "stringValue:_mytype",
"embedding": "stringValue:vector",
}
def execute_statement(
rds_arn: str, database: str, sql: str, params: dict[str, any], secret_arn: str
) -> dict[str, any]:
"""
Execute the SQL statement.
Args:
----
database: The database to use.
rds_arn: The ARN of the resource.
sql: The SQL statement to execute.
params: The parameters for the SQL statement.
secret_arn: The ARN of the secret.
"""
return rds.execute_statement(
database=database,
resourceArn=rds_arn,
secretArn=secret_arn,
sql=sql,
parameters=params,
)
def generate_sql_statement(table: str) -> str:
"""
Generate the SQL statement to insert the data.
Args:
----
table: The table to insert the data into.
Returns:
-------
The SQL statement.
"""
columns = ", ".join(COLUMN_TYPES.keys())
tokens = []
for token, type_name in COLUMN_TYPES.items():
if ":" in type_name:
tokens.append(f":{token}::{type_name.split(':')[1]}")
continue
tokens.append(f":{token}")
return f"INSERT INTO {table}({columns}) VALUES ({', '.join(tokens)});" # noqa: S608 Values from a trusted source
def prepare_params(row: dict[str, any]) -> dict[str, any]:
"""
Prepare the row for insertion into the database.
Args:
----
row: The row to prepare.
Returns:
-------
The prepared row.
"""
processed = []
for key, value in row.items():
type_name = COLUMN_TYPES[key]
if value is None:
processed.append(
{
"name": key,
"value": {"isNull": True},
}
)
continue
if ":" in type_name:
type_name = type_name.split(":")[0]
processed.append(
{
"name": key,
"value": {type_name: value},
}
)
return processed
def read_records(path_to_file: str) -> Generator[dict[str, any], None, None]:
"""
Read the records from the file.
Args:
----
path_to_file: The path to the file.
Returns:
-------
The records.
"""
with open(path_to_file) as f:
while row := f.readline():
yield json.loads(row.replace("\\\\", "\\"))
def main(
rds_arn: str, database: str, table: str, secret_arn: str, path_to_file: str
) -> None:
"""
Load the data into the database.
Args:
----
rds_arn: The ARN of the resource.
database: The database to use.
table: The table to insert the data into.
secret_arn: The ARN of the secret.
path_to_file: The path to the file.
"""
sql = generate_sql_statement(table)
print(sql)
for record in read_records(path_to_file):
params = prepare_params(record)
try:
execute_statement(rds_arn, database, sql, params, secret_arn)
print(f"Inserted record with ID {record['id']}")
except Exception as e: # noqa: BLE001 This is a simple script, we can catch all exceptions
print(f"Failed to insert record with ID {record['id']}: {e}")
if __name__ == "__main__":
main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment