Last active
October 3, 2019 01:01
-
-
Save mlapida/e01eedf7946d20b2b822 to your computer and use it in GitHub Desktop.
A short Lambda Function the can be sent CloudWatch Logs (in the case Flow Logs) and send them to Kinesis Firehose for storage in S3. A full writeup can be found on my site http://mlapida.com/thoughts/exporting-cloudwatch-logs-to-s3-lambda
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import logging | |
import json | |
import gzip | |
from StringIO import StringIO | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
client = boto3.client('firehose') | |
def lambda_handler(event, context): | |
#capture the CloudWatch log data | |
outEvent = str(event['awslogs']['data']) | |
#decode and unzip the log data | |
outEvent = gzip.GzipFile(fileobj=StringIO(outEvent.decode('base64','strict'))).read() | |
#convert the log data from JSON into a dictionary | |
cleanEvent = json.loads(outEvent) | |
#initiate a list | |
s = [] | |
#set the name of the Kinesis Firehose Stream | |
firehoseName = 'FlowLogTest' | |
#loop through the events line by line | |
for t in cleanEvent['logEvents']: | |
#Transform the data and store it in the "Data" field. | |
p={ | |
#Fields in FlowLogs - [version, accountid, interfaceid, srcaddr, dstaddr, srcport, dstport, protocol, packets, bytes, start, stop, action, logstatus] | |
'Data': str(t['extractedFields']['start']) + "," + str(t['extractedFields']['dstaddr']) + "," + str(t['extractedFields']['srcaddr']) + "," + str(t['extractedFields']['packets'])+"\n" | |
} | |
#write the data to our list | |
s.insert(len(s),p) | |
#limit of 500 records per batch. Break it up if you have to. | |
if len(s) > 499: | |
#send the response to Firehose in bulk | |
SendToFireHose(firehoseName, s) | |
#Empty the list | |
s = [] | |
#when done, send the response to Firehose in bulk | |
if len(s) > 0: | |
SendToFireHose(firehoseName, s) | |
#function to send record to Kinesis Firehose | |
def SendToFireHose(streamName, records): | |
response = client.put_record_batch( | |
DeliveryStreamName = streamName, | |
Records=records | |
) | |
#log the number of data points written to Kinesis | |
print "Wrote the following records to Firehose: " + str(len(records)) |
For anyone who wants to use this snippet with Python 3, the following decoding and unzipping code worked for me:
# capture the CloudWatch log data
out_event = str(event['awslogs']['data'])
# decode and unzip the log data
decoded = base64.b64decode(out_event)
file = io.BytesIO(decoded)
out_event = gzip.GzipFile(fileobj=file).read()
# convert the log data from JSON into a dictionary
clean_event = json.loads(out_event)
@cshanes please share the final code
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@mlapida - Thank you for this example. I am trying to demo this but running into some issues. Namely, I am getting the following error and given I'm a noob with Python, I can't seem to figure out how to go about resolve this error condition.
I'm trying to use this example not for VPC Flow logs, but I'm streaming Java application logs from Beanstalk to CloudWatch and from there, I'm streaming the CloudWatch log group to the Lambda function via Stream Subscription.
Was hoping someone could help me understand the issue and point me in the right direction.
initial_value must be unicode or None, not str: TypeError
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 18, in lambda_handler
outEvent = gzip.GzipFile(fileobj=StringIO(outEvent.decode('base64','strict'))).read()
TypeError: initial_value must be unicode or None, not str