Last active
September 12, 2018 09:46
-
-
Save keithrozario/729ccdc1f4e139e01196fbd78a0baf0c to your computer and use it in GitHub Desktop.
Load JSONL into DynamoDB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import boto3 | |
import customConfig | |
import os | |
import argparse | |
import datetime | |
""" Loading JSONL file into DynamoDB, customConfig.table_name has the DynamoDB Table Name""" | |
import decimal | |
from boto3.dynamodb.types import DYNAMODB_CONTEXT | |
# Inhibit Inexact Exceptions | |
DYNAMODB_CONTEXT.traps[decimal.Inexact] = 0 | |
# Inhibit Rounded Exceptions | |
DYNAMODB_CONTEXT.traps[decimal.Rounded] = 0 | |
class ComplexDecoder(json.JSONDecoder): | |
def default(self, obj): | |
if isinstance(obj, datetime.datetime): | |
return str(obj) | |
elif isinstance(obj, datetime.date): | |
return str(obj) | |
elif isinstance(obj, float): | |
return str(obj) | |
elif isinstance(obj, int): | |
return str(obj) | |
elif obj is None: | |
return None | |
def remove_nulls(d): | |
return {k: v for k, v in d.items() if v is not None and v != ''} | |
def string_it(d): | |
return str(d) | |
if __name__ == "__main__": | |
# read in directory name | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-d", "--dir", | |
help="Directory(Folder) with the JSONL files", | |
required=False) | |
args = parser.parse_args() | |
# Logging setup | |
logging.basicConfig(filename='logs/loadJson.log', | |
filemode='a', | |
level=logging.INFO, | |
format='%(asctime)s %(message)s', | |
datefmt='%m/%d/%Y %I:%M:%S %p') | |
logger = logging.getLogger(__name__) | |
console = logging.StreamHandler() | |
console.setLevel(logging.INFO) | |
logger.addHandler(console) | |
my_session = boto3.session.Session() | |
my_region = my_session.region_name | |
logger.info("Begin writing to " + | |
my_region + ":" + customConfig.table_name) | |
# Loop through every file in directory | |
for filename in os.listdir(args.dir): | |
# create array of jsons | |
json_records = [] | |
with open(args.dir + '/' + filename, 'r') as f: | |
jsons = f.readlines() | |
# load each json to a record, and append to json_records | |
counter = 0 | |
for row in jsons: | |
counter += 1 | |
try: | |
record = json.loads(row, | |
object_hook=remove_nulls, | |
parse_float=str(), # needed to avoid exception thrown | |
parse_int=str(), # needed to avoid exception thrown | |
parse_constant=str()) | |
record['dateTimeAdded'] = datetime.datetime.now().isoformat() | |
json_records.append(record) | |
except AttributeError: | |
logger.info("Empty Row in " + filename + ":" + str(counter)) | |
# Table setup | |
dynamodb = boto3.resource('dynamodb') # make sure ~/.aws/config has the region name | |
table = dynamodb.Table(customConfig.table_name) | |
# write to table | |
with table.batch_writer() as batch: | |
counter = 0 | |
for record in json_records: | |
batch.put_item(Item=record) | |
counter += 1 | |
if counter % 25 == 0: | |
logger.info("Wrote " + str(counter) + " rows from " + filename) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment