Skip to content

Instantly share code, notes, and snippets.

@keithrozario
Last active September 12, 2018 09:46
Show Gist options
  • Save keithrozario/729ccdc1f4e139e01196fbd78a0baf0c to your computer and use it in GitHub Desktop.
Save keithrozario/729ccdc1f4e139e01196fbd78a0baf0c to your computer and use it in GitHub Desktop.
Load JSONL into DynamoDB
import json
import logging
import boto3
import customConfig
import os
import argparse
import datetime
""" Loading JSONL file into DynamoDB, customConfig.table_name has the DynamoDB Table Name"""
import decimal
from boto3.dynamodb.types import DYNAMODB_CONTEXT
# Inhibit Inexact Exceptions
DYNAMODB_CONTEXT.traps[decimal.Inexact] = 0
# Inhibit Rounded Exceptions
DYNAMODB_CONTEXT.traps[decimal.Rounded] = 0
class ComplexDecoder(json.JSONDecoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return str(obj)
elif isinstance(obj, datetime.date):
return str(obj)
elif isinstance(obj, float):
return str(obj)
elif isinstance(obj, int):
return str(obj)
elif obj is None:
return None
def remove_nulls(d):
return {k: v for k, v in d.items() if v is not None and v != ''}
def string_it(d):
return str(d)
if __name__ == "__main__":
# read in directory name
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--dir",
help="Directory(Folder) with the JSONL files",
required=False)
args = parser.parse_args()
# Logging setup
logging.basicConfig(filename='logs/loadJson.log',
filemode='a',
level=logging.INFO,
format='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p')
logger = logging.getLogger(__name__)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logger.addHandler(console)
my_session = boto3.session.Session()
my_region = my_session.region_name
logger.info("Begin writing to " +
my_region + ":" + customConfig.table_name)
# Loop through every file in directory
for filename in os.listdir(args.dir):
# create array of jsons
json_records = []
with open(args.dir + '/' + filename, 'r') as f:
jsons = f.readlines()
# load each json to a record, and append to json_records
counter = 0
for row in jsons:
counter += 1
try:
record = json.loads(row,
object_hook=remove_nulls,
parse_float=str(), # needed to avoid exception thrown
parse_int=str(), # needed to avoid exception thrown
parse_constant=str())
record['dateTimeAdded'] = datetime.datetime.now().isoformat()
json_records.append(record)
except AttributeError:
logger.info("Empty Row in " + filename + ":" + str(counter))
# Table setup
dynamodb = boto3.resource('dynamodb') # make sure ~/.aws/config has the region name
table = dynamodb.Table(customConfig.table_name)
# write to table
with table.batch_writer() as batch:
counter = 0
for record in json_records:
batch.put_item(Item=record)
counter += 1
if counter % 25 == 0:
logger.info("Wrote " + str(counter) + " rows from " + filename)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment