Skip to content

Instantly share code, notes, and snippets.

@perryism
Last active August 21, 2019 15:58
Show Gist options
  • Save perryism/5a191b4d80d0f717ed26cc79df60cc64 to your computer and use it in GitHub Desktop.
Save perryism/5a191b4d80d0f717ed26cc79df60cc64 to your computer and use it in GitHub Desktop.
firehose json record decorator
class Record:
def __init__(self, record):
self.record = record
def decoded_data(self):
return base64.b64decode(self.record['data'])
def merge(self, new_data):
return {
'recordId': self.record['recordId'],
'result': 'Ok',
'data': self.encode_data(new_data)
}
def encode_data(self, new_data):
return base64.encodebytes(new_data.encode()).decode()
class JsonRecord(Record):
def decoded_data(self):
return json.loads(super().decoded_data())
def encode_data(self, new_data):
data = json.dumps(new_data)
return super().encode_data(data)
def firehose_json(func):
def wrapper(record):
jrecord = JsonRecord(record)
raw_payload = func(jrecord.decoded_data())
return jrecord.merge(raw_payload)
return wrapper
@firehose_json
def transform(record):
#do the fancy stuff here
return record
def record_iterator(event, func):
results = []
for record in event['records']:
transformed = func(record)
if transformed:
results.append(transformed)
return {'records': results}
def lambda_handler(event, context):
return record_iterator(event, transform)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment