Last active
April 16, 2021 14:03
-
-
Save ervinne13/76ad8d1cd5c1ceb551880ded5b209180 to your computer and use it in GitHub Desktop.
Sample Lambda Function for Processing CSVs from an S3 Bucket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os import environ | |
import boto3 | |
from mongodb import initialize_connection | |
from pandas import read_csv | |
from operations_writer import OperationsWriter | |
from worklogs_operations_generator import WorklogsOperationsGenerator | |
from worklogs_repository import WorklogsRepository | |
def lambda_handler(event, context): | |
s3 = boto3.client('s3') | |
assert_event_has_record(event) | |
record = event['Records'][0] | |
df = get_csv_contents_as_data_frame(record, s3) | |
with initialize_connection() as client, client.start_session() as session: | |
with session.start_transaction(): | |
db = client.worklogs | |
op_wrtr = OperationsWriter(db, session) | |
repo = WorklogsRepository(db, session) | |
gnrtor = WorklogsOperationsGenerator(repo) | |
ops = gnrtor.generate_from_data_frame(df) | |
op_wrtr.write_batch(ops) | |
return { | |
'statusCode': 200, | |
'body': 'Success' | |
} | |
def assert_event_has_record(event): | |
if (len(event['Records']) <= 0): | |
raise Exception("event must have a record about an s3 object.") | |
def get_csv_contents_as_data_frame(record, s3): | |
bucket_name = record['s3']['bucket']['name'] | |
file_name = record['s3']['object']['key'] | |
return read_csv(f's3://{bucket_name}/{file_name}', sep=',') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os import environ | |
from pymongo import MongoClient | |
def initialize_connection(): | |
conn_string = environ['mongodb_conn_string'] | |
client = MongoClient(conn_string) | |
return client |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
class OperationsWriter: | |
def __init__(self, mongo_db, mongo_sess): | |
self.mongo_db = mongo_db | |
self.mongo_sess = mongo_sess | |
def write_batch(self, batch): | |
collection = self.mongo_db.operations | |
collection.insert_one({ | |
**batch, | |
'executed_at': datetime.now() | |
}, session=self.mongo_sess) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class WorklogsOperationsGenerator: | |
def __init__(self, worklogs_repository): | |
self.worklogs_repository = worklogs_repository | |
def generate_from_data_frame(self, df): | |
id_list = df['id'].tolist() | |
id_set = set(id_list) | |
split_ids = self.worklogs_repository.split_id_set_by_new(id_set) | |
create_ops = [] | |
update_ops = [] | |
for index, wrklog_row in df.iterrows(): | |
op = self.create_operation(split_ids, wrklog_row) | |
if (op['action'] == 'update'): | |
update_ops.append(op) | |
elif (op['action'] == 'create'): | |
create_ops.append(op) | |
return { | |
'updates': update_ops, | |
'creates': create_ops | |
} | |
def create_operation(self, split_ids, wrklog_row): | |
action = self.get_action_or_fail(split_ids, wrklog_row) | |
body = { | |
'id': wrklog_row['id'], | |
'date': wrklog_row['date'], | |
'project': wrklog_row['project'], | |
'type': wrklog_row['type'], | |
'mins_spent': wrklog_row['minutesspent'], | |
'employee': { | |
'id': wrklog_row['employee_id'], | |
'name': wrklog_row['display_name'] | |
} | |
} | |
return { | |
'resource': 'worklog', | |
'worklog_id': wrklog_row['id'], | |
'action': action, | |
'body': body | |
} | |
def get_action_or_fail(self, split_ids, worklog): | |
id = worklog['id'] | |
if (id in split_ids['existing']): | |
return 'update' | |
if (id in split_ids['new']): | |
return 'create' | |
raise Exception(f'There was an issue in the split_ids when processing worklog {id}') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TODO: Check out inversion of control in python and decouple mongodb or at least | |
# make this as an interface implementation | |
class WorklogsRepository: | |
def __init__(self, mongo_db, mongo_sess): | |
self.mongo_db = mongo_db | |
self.mongo_sess = mongo_sess | |
# Warning! for testing purposes only! | |
def print_all_worklogs(self): | |
for w in self.mongo_db.worklogs.find({}, session=self.mongo_sess): | |
print(w) | |
def split_id_set_by_new(self, id_set): | |
already_saved_ids = set() | |
filter_new_ids = { 'id': { '$in': list(id_set) } } | |
select_column_id = { 'id': 1 } | |
collection = self.mongo_db.worklogs | |
for w in collection.find(filter_new_ids, select_column_id, session=self.mongo_sess): | |
already_saved_ids.add(w['id']) | |
return { | |
'existing': already_saved_ids, | |
'new': id_set - already_saved_ids | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment