|
"""Load data from JSON dump into Aurora PostgreSQL DB using the data API.""" |
|
|
|
import json |
|
import sys |
|
from collections.abc import Generator |
|
|
|
import boto3 |
|
|
|
rds = boto3.client("rds-data") |
|
|
|
# Update this dict to define your columns |
|
# Refer to https://docs.aws.amazon.com/rdsdataservice/latest/APIReference/API_Field.html for the types |
|
# Use nativeType:custom_type to cast a value from a string to a custom type |
|
COLUMN_TYPES = { |
|
"text_val": "stringValue", |
|
"int_val": "longValue", |
|
"float_val": "doubleValue", |
|
"custom_type": "stringValue:_mytype", |
|
"embedding": "stringValue:vector", |
|
} |
|
|
|
|
|
def execute_statement( |
|
rds_arn: str, database: str, sql: str, params: dict[str, any], secret_arn: str |
|
) -> dict[str, any]: |
|
""" |
|
Execute the SQL statement. |
|
|
|
Args: |
|
---- |
|
database: The database to use. |
|
rds_arn: The ARN of the resource. |
|
sql: The SQL statement to execute. |
|
params: The parameters for the SQL statement. |
|
secret_arn: The ARN of the secret. |
|
|
|
""" |
|
return rds.execute_statement( |
|
database=database, |
|
resourceArn=rds_arn, |
|
secretArn=secret_arn, |
|
sql=sql, |
|
parameters=params, |
|
) |
|
|
|
|
|
def generate_sql_statement(table: str) -> str: |
|
""" |
|
Generate the SQL statement to insert the data. |
|
|
|
Args: |
|
---- |
|
table: The table to insert the data into. |
|
|
|
Returns: |
|
------- |
|
The SQL statement. |
|
|
|
""" |
|
columns = ", ".join(COLUMN_TYPES.keys()) |
|
tokens = [] |
|
for token, type_name in COLUMN_TYPES.items(): |
|
if ":" in type_name: |
|
tokens.append(f":{token}::{type_name.split(':')[1]}") |
|
continue |
|
|
|
tokens.append(f":{token}") |
|
|
|
return f"INSERT INTO {table}({columns}) VALUES ({', '.join(tokens)});" # noqa: S608 Values from a trusted source |
|
|
|
|
|
def prepare_params(row: dict[str, any]) -> dict[str, any]: |
|
""" |
|
Prepare the row for insertion into the database. |
|
|
|
Args: |
|
---- |
|
row: The row to prepare. |
|
|
|
Returns: |
|
------- |
|
The prepared row. |
|
|
|
""" |
|
processed = [] |
|
|
|
for key, value in row.items(): |
|
type_name = COLUMN_TYPES[key] |
|
if value is None: |
|
processed.append( |
|
{ |
|
"name": key, |
|
"value": {"isNull": True}, |
|
} |
|
) |
|
continue |
|
|
|
if ":" in type_name: |
|
type_name = type_name.split(":")[0] |
|
|
|
processed.append( |
|
{ |
|
"name": key, |
|
"value": {type_name: value}, |
|
} |
|
) |
|
|
|
return processed |
|
|
|
|
|
def read_records(path_to_file: str) -> Generator[dict[str, any], None, None]: |
|
""" |
|
Read the records from the file. |
|
|
|
Args: |
|
---- |
|
path_to_file: The path to the file. |
|
|
|
Returns: |
|
------- |
|
The records. |
|
|
|
""" |
|
with open(path_to_file) as f: |
|
while row := f.readline(): |
|
yield json.loads(row.replace("\\\\", "\\")) |
|
|
|
|
|
def main( |
|
rds_arn: str, database: str, table: str, secret_arn: str, path_to_file: str |
|
) -> None: |
|
""" |
|
Load the data into the database. |
|
|
|
Args: |
|
---- |
|
rds_arn: The ARN of the resource. |
|
database: The database to use. |
|
table: The table to insert the data into. |
|
secret_arn: The ARN of the secret. |
|
path_to_file: The path to the file. |
|
|
|
""" |
|
sql = generate_sql_statement(table) |
|
print(sql) |
|
|
|
for record in read_records(path_to_file): |
|
params = prepare_params(record) |
|
try: |
|
execute_statement(rds_arn, database, sql, params, secret_arn) |
|
print(f"Inserted record with ID {record['id']}") |
|
except Exception as e: # noqa: BLE001 This is a simple script, we can catch all exceptions |
|
print(f"Failed to insert record with ID {record['id']}: {e}") |
|
|
|
|
|
if __name__ == "__main__": |
|
main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5]) |