-
-
Save mlapida/e01eedf7946d20b2b822 to your computer and use it in GitHub Desktop.
import boto3 | |
import logging | |
import json | |
import gzip | |
from StringIO import StringIO | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
client = boto3.client('firehose') | |
def lambda_handler(event, context): | |
#capture the CloudWatch log data | |
outEvent = str(event['awslogs']['data']) | |
#decode and unzip the log data | |
outEvent = gzip.GzipFile(fileobj=StringIO(outEvent.decode('base64','strict'))).read() | |
#convert the log data from JSON into a dictionary | |
cleanEvent = json.loads(outEvent) | |
#initiate a list | |
s = [] | |
#set the name of the Kinesis Firehose Stream | |
firehoseName = 'FlowLogTest' | |
#loop through the events line by line | |
for t in cleanEvent['logEvents']: | |
#Transform the data and store it in the "Data" field. | |
p={ | |
#Fields in FlowLogs - [version, accountid, interfaceid, srcaddr, dstaddr, srcport, dstport, protocol, packets, bytes, start, stop, action, logstatus] | |
'Data': str(t['extractedFields']['start']) + "," + str(t['extractedFields']['dstaddr']) + "," + str(t['extractedFields']['srcaddr']) + "," + str(t['extractedFields']['packets'])+"\n" | |
} | |
#write the data to our list | |
s.insert(len(s),p) | |
#limit of 500 records per batch. Break it up if you have to. | |
if len(s) > 499: | |
#send the response to Firehose in bulk | |
SendToFireHose(firehoseName, s) | |
#Empty the list | |
s = [] | |
#when done, send the response to Firehose in bulk | |
if len(s) > 0: | |
SendToFireHose(firehoseName, s) | |
#function to send record to Kinesis Firehose | |
def SendToFireHose(streamName, records): | |
response = client.put_record_batch( | |
DeliveryStreamName = streamName, | |
Records=records | |
) | |
#log the number of data points written to Kinesis | |
print "Wrote the following records to Firehose: " + str(len(records)) |
Thanks @mlapida for the example. I left everything as a json for the logs since logstash and splunk know how to parse json log lines
s.append({'Data': "{d}\n".format(d=json.dumps(t['extractedFields']))})
this was extremely helpful for figuring out how to properly parse the CloudWatch flow log, thanks!
@mlapida - Thank you for this example. I am trying to demo this but running into some issues. Namely, I am getting the following error and given I'm a noob with Python, I can't seem to figure out how to go about resolve this error condition.
I'm trying to use this example not for VPC Flow logs, but I'm streaming Java application logs from Beanstalk to CloudWatch and from there, I'm streaming the CloudWatch log group to the Lambda function via Stream Subscription.
Was hoping someone could help me understand the issue and point me in the right direction.
initial_value must be unicode or None, not str: TypeError
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 18, in lambda_handler
outEvent = gzip.GzipFile(fileobj=StringIO(outEvent.decode('base64','strict'))).read()
TypeError: initial_value must be unicode or None, not str
For anyone who wants to use this snippet with Python 3, the following decoding and unzipping code worked for me:
# capture the CloudWatch log data
out_event = str(event['awslogs']['data'])
# decode and unzip the log data
decoded = base64.b64decode(out_event)
file = io.BytesIO(decoded)
out_event = gzip.GzipFile(fileobj=file).read()
# convert the log data from JSON into a dictionary
clean_event = json.loads(out_event)
@cshanes please share the final code
Would be nice if you could include all the fields on line 35. You cover only 4 but the logs are more verbose than that. I wouldn't know how to to make sense of these as a newbie. Thanks for the write up.