Created
November 19, 2018 09:52
-
-
Save bzcorn/28e4d9c96159f66c7084ea8b52be729f to your computer and use it in GitHub Desktop.
This is for a lambda to respond to new cloudtrail or config logs hitting an s3 bucket and forwarding to sumologic. Some logs are >65kb in size and need to be broken up and sent to Sumologic. Package this (along with the requisite python libs) and have it listen to ObjectCreate events in your S3 bucket that cloudtrail/config logs are sent to. Thi…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
import async_timeout | |
import gzip | |
import os | |
import json | |
import boto3 | |
import urllib.parse | |
import botocore | |
services = { | |
"CloudTrail": { | |
"url": "", | |
"keyword": "Records" | |
}, | |
"Config": { | |
"url": "", | |
"keyword": "configurationItems" | |
} | |
} | |
async def push_logs(session, url, data): | |
with async_timeout.timeout(30): | |
async with session.post(url, data=data) as response: | |
print(response.status) | |
print(await response.text()) | |
async def main(loop="", url="", log_data={}): | |
json_data = log_data | |
headers = {"Content-Encoding": "gzip"} | |
async with aiohttp.ClientSession(loop=loop, headers=headers) as session: | |
tasks = [push_logs(session, url, data) for data in json_data] | |
return await asyncio.gather(*tasks) | |
def compress_data(data="", keyword=""): | |
""" | |
Turn a JSON object into a compressed list | |
""" | |
compressed_data_list = [] | |
for item in data[keyword]: | |
# Turn the item from JSON to a string, encode it as byte code, compress | |
# then add to list | |
compressed_data_list.append(gzip.compress(json.dumps(item).encode())) | |
return compressed_data_list | |
def get_json_from_compressed_object(bucket="", key=""): | |
""" | |
Get compressed data from S3 object and turn it into usable JSON | |
""" | |
client = boto3.client('s3') | |
response = client.get_object(Bucket=bucket, Key=key) | |
compressed_data = response['Body'].read() | |
data = json.loads(gzip.decompress(compressed_data)) | |
return data | |
def transform_data(bucket="", key="", keyword=""): | |
""" | |
Extract and Transform Data | |
""" | |
data = get_json_from_compressed_object(bucket=bucket, key=key) | |
compressed_data_list = compress_data(data=data, keyword=keyword) | |
return compressed_data_list | |
def lambda_handler(event, context): | |
bucket = event['Records'][0]['s3']['bucket']['name'] | |
key = event['Records'][0]['s3']['object']['key'] | |
print (f'Key name: {key}') | |
split_key = key.split('/') | |
if len(split_key) > 3: | |
service = split_key[2] | |
print(f'Service is: {service}') | |
if service == "CloudTrail": | |
log_data = transform_data( | |
bucket=bucket, | |
key=key, | |
keyword=services[service]['keyword'] | |
) | |
elif service == "Config": | |
if len(split_key) >= 7: | |
log_type = split_key[7] | |
if log_type != "ConfigSnapshot": | |
return | |
log_data = transform_data( | |
bucket=bucket, | |
key=key, | |
keyword=services[service]['keyword'] | |
) | |
else: | |
return | |
else: | |
print(f'This log type is not getting logged today: {service}') | |
return | |
print(f'Logging for service: {service}') | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete( | |
main( | |
loop=loop, | |
url=services[service]['url'], | |
log_data=log_data | |
) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment