Forked from mlapida/CloudWatchLogsKinesisFirehose-Lambda.py
Created
September 14, 2017 05:17
-
-
Save codevally/028b0cf686654e1a788eecdcd8659531 to your computer and use it in GitHub Desktop.
A short Lambda Function the can be sent CloudWatch Logs (in the case Flow Logs) and send them to Kinesis Firehose for storage in S3. A full writeup can be found on my site http://mlapida.com/thoughts/exporting-cloudwatch-logs-to-s3-lambda
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import logging | |
import json | |
import gzip | |
from StringIO import StringIO | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
client = boto3.client('firehose') | |
def lambda_handler(event, context): | |
#capture the CloudWatch log data | |
outEvent = str(event['awslogs']['data']) | |
#decode and unzip the log data | |
outEvent = gzip.GzipFile(fileobj=StringIO(outEvent.decode('base64','strict'))).read() | |
#convert the log data from JSON into a dictionary | |
cleanEvent = json.loads(outEvent) | |
#initiate a list | |
s = [] | |
#set the name of the Kinesis Firehose Stream | |
firehoseName = 'FlowLogTest' | |
#loop through the events line by line | |
for t in cleanEvent['logEvents']: | |
#Transform the data and store it in the "Data" field. | |
p={ | |
#Fields in FlowLogs - [version, accountid, interfaceid, srcaddr, dstaddr, srcport, dstport, protocol, packets, bytes, start, stop, action, logstatus] | |
'Data': str(t['extractedFields']['start']) + "," + str(t['extractedFields']['dstaddr']) + "," + str(t['extractedFields']['srcaddr']) + "," + str(t['extractedFields']['packets'])+"\n" | |
} | |
#write the data to our list | |
s.insert(len(s),p) | |
#limit of 500 records per batch. Break it up if you have to. | |
if len(s) > 499: | |
#send the response to Firehose in bulk | |
SendToFireHose(firehoseName, s) | |
#Empty the list | |
s = [] | |
#when done, send the response to Firehose in bulk | |
if len(s) > 0: | |
SendToFireHose(firehoseName, s) | |
#function to send record to Kinesis Firehose | |
def SendToFireHose(streamName, records): | |
response = client.put_record_batch( | |
DeliveryStreamName = streamName, | |
Records=records | |
) | |
#log the number of data points written to Kinesis | |
print "Wrote the following records to Firehose: " + str(len(records)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment