Skip to content

Instantly share code, notes, and snippets.

@laughingman7743
Last active March 14, 2016 04:19
Show Gist options
  • Save laughingman7743/406a508d76f87f6181ec to your computer and use it in GitHub Desktop.
Save laughingman7743/406a508d76f87f6181ec to your computer and use it in GitHub Desktop.
{
"Records": [
{
"eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
"eventVersion": "1.0",
"kinesis": {
"partitionKey": "partitionKey-3",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",
"kinesisSchemaVersion": "1.0",
"sequenceNumber": "49545115243490985018280067714973144582180062593244200961"
},
"invokeIdentityArn": "arn:aws:iam::EXAMPLE",
"eventName": "aws:kinesis:record",
"eventSourceARN": "arn:aws:kinesis:EXAMPLE",
"eventSource": "aws:kinesis",
"awsRegion": "us-east-1"
}
]
}
{
"name": "YOUR_LAMBDA_NAME",
"description": "YOUR_LAMBDA_DESCRIPTION",
"region": "YOUR_REGION",
"handler": "lambda_function.lambda_handler",
"role": "arn:aws:iam::YOUR_ACCOUNT:role/YOUR_ROLE_NAME",
"timeout": 300,
"memory": 256
}
import base64
import gzip
import logging
import os
import uuid
import boto3
from datetime import datetime
logger = logging.getLogger()
logger.setLevel(logging.INFO)
REGION_NAME = 'YOUR_REGION'
S3_BUCKET = 'YOUR_BUCKET'
S3_PREFIX = 'PREFIX/UPLOAD/FILE'
STREAM_NAME = 'YOUR_STREAM_NAME'
TMP_DIR = '/tmp'
def create_tmp_dir(timestamp):
dir_name = os.path.join(TMP_DIR, timestamp.strftime('%Y%m%d%H%M%S%f'))
os.mkdir(dir_name)
return dir_name
def create_gzip_file(events, timestamp, tmp_dir):
gzip_file = os.path.join(tmp_dir, '{0}-{1}-{2}.gz'
.format(STREAM_NAME, timestamp.strftime('%Y%m%d%H'), uuid.uuid4()))
with gzip.open(gzip_file, 'wb') as out:
for line in events:
out.writelines(line + '\n')
return gzip_file
def put_object(events):
timestamp = datetime.utcnow()
tmp_dir = create_tmp_dir(timestamp)
gzip_file = create_gzip_file(events, timestamp, tmp_dir)
s3_client = boto3.client('s3', region_name=REGION_NAME)
s3_client.put_object(Bucket=S3_BUCKET,
Key='/'.join([S3_PREFIX, timestamp.strftime('%Y/%m/%d/%H'), os.path.basename(gzip_file)]),
Body=open(gzip_file, 'rb'))
logger.info('Upload {} completed.'.format(gzip_file))
os.remove(gzip_file)
def decode_event(event):
events = []
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data'])
events.append(payload)
return events
def main(event):
events = decode_event(event)
put_object(events)
return 'Successfully processed {} records.'.format(len(event['Records']))
def lambda_handler(event, context):
try:
return main(event)
except Exception as e:
logger.exception('Failed to process lambda function.')
raise e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment