-
-
Save oxidizeddreams/162e6291afbef4e7683007e2d6e5a13b to your computer and use it in GitHub Desktop.
AWS lambda function to subscribe new CloudWatch Log groups to another lambda function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Lambda function to subscribe all new cloudwatch log groups to a log shipper function | |
# Used in conjunction with https://github.com/SumoLogic/sumologic-aws-lambda | |
import os | |
import logging | |
import json | |
import uuid | |
import boto3 | |
from botocore.exceptions import ClientError | |
# set logging | |
log = logging.getLogger() | |
log.setLevel(logging.DEBUG) | |
logging.getLogger('botocore').setLevel(logging.WARNING) | |
l = boto3.client("lambda") | |
logs = boto3.client("logs") | |
# Target function for subscriptions | |
LOG_SHIPPER_FUNCTION = os.environ['TARGET_FUNCTION'] | |
# Common name for subscription filters | |
FILTER_NAME = "all" | |
def get_log_groups(): | |
response = logs.describe_log_groups() | |
log_groups = response["logGroups"] | |
next_token = response.get("nextToken") | |
while next_token: | |
response = logs.describe_log_groups(nextToken=next_token) | |
log_groups.extend(response["logGroups"]) | |
next_token = response.get("nextToken") | |
return log_groups | |
def get_policy(): | |
try: | |
policy = l.get_policy(FunctionName=LOG_SHIPPER_FUNCTION).get("Policy") | |
return json.loads(policy) | |
except ClientError: | |
log.warn("No policy found for {}".format(LOG_SHIPPER_FUNCTION)) | |
def remove_permission(sid): | |
l.remove_permission(FunctionName=LOG_SHIPPER_FUNCTION, StatementId=sid) | |
def get_shipper_arn(): | |
response = l.get_function(FunctionName=LOG_SHIPPER_FUNCTION) | |
return response["Configuration"]["FunctionArn"] | |
def subscribe_log_group(lg_name, region, account_id): | |
log.info("Setting permissions") | |
response = l.add_permission( | |
FunctionName=LOG_SHIPPER_FUNCTION, | |
StatementId=str(uuid.uuid4()), | |
Action='lambda:InvokeFunction', | |
Principal="logs.{}.amazonaws.com".format(region), | |
SourceArn="arn:aws:logs:{}:{}:log-group:{}:*".format( | |
region, | |
account_id, | |
lg_name), | |
SourceAccount=account_id) | |
log.debug("Response: {}".format(response)) | |
log.info("Subscribing log group") | |
logs.put_subscription_filter( | |
logGroupName=lg_name, | |
filterName=FILTER_NAME, | |
filterPattern='', | |
destinationArn=get_shipper_arn()) | |
def remove_stale_subscriptions(): | |
"""CloudWatch Log groups that were subscribed to the log shipper | |
but have been deleted but cannot be cleaned up in the UI. This | |
code searches for stale log groups and cleans up the permissions.""" | |
log.info("Fetching function policy") | |
policy = get_policy() | |
if not policy: | |
return | |
log.info("Fetching log groups") | |
log_groups = get_log_groups() | |
log.info("Discovered {} log groups".format(len(log_groups))) | |
lg_arns = [lg["arn"] for lg in log_groups] | |
for statement in policy["Statement"]: | |
policy_arn = statement["Condition"]["ArnLike"]["AWS:SourceArn"] | |
if policy_arn not in lg_arns: | |
log.info("Identified missing log group. Removing the permission") | |
sid = statement["Sid"] | |
remove_permission(sid) | |
def lambda_handler(event, context): | |
log.info("Running log subscriber") | |
region = context.invoked_function_arn.split(':')[3] | |
account_id = context.invoked_function_arn.split(':')[4] | |
remove_stale_subscriptions() | |
# Commence subscription functionality | |
event_name = event['detail']['eventName'] | |
if event_name == "CreateLogGroup": | |
log_group_name = event['detail']['requestParameters']['logGroupName'] | |
subscribe_log_group(log_group_name, region, account_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment