-
-
Save avoidik/66f11f9aa6c195dcfbc5944030154b97 to your computer and use it in GitHub Desktop.
Deploys lambda functions to forward cloudwatch logs to logstash
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Description: Deploys lambda functions to forward cloudwatch logs to logstash | |
Parameters: | |
coreNetworkingStackName: | |
Type: String | |
Resources: | |
lambdaRole: | |
Type: "AWS::IAM::Role" | |
Properties: | |
AssumeRolePolicyDocument: | |
Version: "2012-10-17" | |
Statement: | |
- Effect: Allow | |
Principal: | |
Service: "lambda.amazonaws.com" | |
Action: "sts:AssumeRole" | |
ManagedPolicyArns: | |
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole | |
lambdaPolicy: | |
Type: "AWS::IAM::Policy" | |
Properties: | |
PolicyDocument: | |
Version: "2012-10-17" | |
Statement: | |
- Effect: Allow | |
Action: | |
- "logs:PutRetentionPolicy" | |
- "logs:describeLogGroups" | |
- "logs:putSubscriptionFilter" | |
- "logs:deleteSubscriptionFilter" | |
Resource: "*" | |
PolicyName: !Ref "AWS::StackName" | |
Roles: | |
- !Ref lambdaRole | |
cloudwatchPermission: | |
Type: AWS::Lambda::Permission | |
Properties: | |
FunctionName: !Ref processLogGroupFunction | |
Action: lambda:InvokeFunction | |
Principal: logs.amazonaws.com | |
SourceArn: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*" | |
SourceAccount: !Ref "AWS::AccountId" | |
lambdaSecurityGroup: | |
Type: AWS::EC2::SecurityGroup | |
Properties: | |
GroupDescription: logstash cloudwatch log lambda processor | |
VpcId: | |
Fn::ImportValue: !Sub ${coreNetworkingStackName}:vpcId | |
processLogGroupFunction: | |
Type: AWS::Lambda::Function | |
Properties: | |
Handler: index.handler | |
Role: !GetAtt lambdaRole.Arn | |
Runtime: python3.6 | |
Timeout: '300' | |
VpcConfig: | |
SecurityGroupIds: | |
- !Ref lambdaSecurityGroup | |
SubnetIds: | |
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2A | |
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2B | |
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2C | |
Code: | |
ZipFile: !Sub | | |
#!/usr/bin/env python3 | |
import socket | |
import sys | |
import json | |
import gzip | |
import copy | |
import base64 | |
import re | |
def transform(data): | |
new_data = copy.deepcopy(data) | |
new_data['@metadata'] = { | |
"beat": "lambda", | |
"version": "0.0.1" | |
} | |
if 'service' in data: | |
if data['service'] == 'blah': | |
new_data['@metadata']['beat'] = 'ecs' | |
if 'timestamp' in data: | |
del new_data['timestamp'] | |
new_data['lambda_timestamp'] = data['timestamp'] | |
if 'port' in data: | |
del new_data['port'] | |
return new_data | |
def send_log(data): | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.connect(('logstash.xxx.internal', 5000)) | |
s.sendall(str(json.dumps(data)).encode('utf-8')) | |
s.send('\n'.encode('utf-8')) | |
s.close() | |
def handler(event, context): | |
decompressed = gzip.decompress( | |
base64.b64decode(event['awslogs']['data'])).decode('utf-8') | |
try: | |
data = json.loads(decompressed) | |
except Exception as e: | |
return | |
for str_event in data['logEvents']: | |
try: | |
e = json.loads(str_event['message']) | |
except Exception as e: | |
return | |
if 'level' in e: | |
if e['level'] == 'debug': | |
return | |
send_log(transform(e)) | |
if __name__ == '__main__': | |
handler(None, None) | |
updateAllLogGroupsFunction: | |
Type: AWS::Lambda::Function | |
Properties: | |
Handler: index.handler | |
Role: !GetAtt lambdaRole.Arn | |
Runtime: python3.6 | |
Timeout: '30' | |
Environment: | |
Variables: | |
SUSBCRIPTION_FUNCTION_ARN: !GetAtt processLogGroupFunction.Arn | |
Code: | |
ZipFile: !Sub | | |
#!/usr/bin/env python3 | |
import boto3 | |
import os | |
c = boto3.client('logs') | |
bad_groups = [ | |
'core-networking', | |
's3av', | |
'healthcheck', | |
'updateLogGroupFunction', | |
'processLogGroupFunction' | |
] | |
good_groups = [ | |
'adamite', | |
'/aws/lambda' | |
] | |
def get_log_groups(): | |
groups = [] | |
params = {} | |
while True: | |
response = c.describe_log_groups(**params) | |
for group in response['logGroups']: | |
is_bad_group = False | |
for bg in bad_groups: | |
if bg in group['logGroupName']: | |
is_bad_group = True | |
if not is_bad_group: | |
for gg in good_groups: | |
if group['logGroupName'].startswith(gg): | |
groups.append(group['logGroupName']) | |
else: | |
delete_subscription(group['logGroupName']) | |
if 'nextToken' in response: | |
params['nextToken'] = response['nextToken'] | |
else: | |
break | |
return groups | |
def delete_subscription(group): | |
try: | |
c.delete_subscription_filter( | |
logGroupName=group, | |
filterName='logstash') | |
except Exception as e: | |
print(e) | |
def create_subscription(group): | |
sub_arn = os.getenv('SUSBCRIPTION_FUNCTION_ARN') | |
c.put_subscription_filter( | |
logGroupName=group, | |
filterName='logstash', | |
filterPattern='', | |
destinationArn=sub_arn | |
) | |
def handler(event, context): | |
groups = get_log_groups() | |
for g in groups: | |
create_subscription(g) | |
# delete_subscription(g) | |
if __name__ == '__main__': | |
handler(None, None) | |
updateLogGroupFunction: | |
Type: AWS::Lambda::Function | |
Properties: | |
Handler: index.handler | |
Role: !GetAtt lambdaRole.Arn | |
Runtime: python3.6 | |
Timeout: '30' | |
Environment: | |
Variables: | |
SUSBCRIPTION_FUNCTION_ARN: !GetAtt processLogGroupFunction.Arn | |
Code: | |
ZipFile: !Sub | | |
#!/usr/bin/env python | |
import boto3 | |
import json | |
import os | |
logs = boto3.client('logs') | |
bad_groups = [ | |
'core-networking', | |
's3av', | |
'healthcheck', | |
'updateLogGroupFunction', | |
'processLogGroupFunction' | |
] | |
good_groups = [ | |
'adamite', | |
'/aws/lambda' | |
] | |
def create_subscription(group): | |
for g in bad_groups: | |
if g in group: | |
return | |
sub_arn = os.getenv('SUSBCRIPTION_FUNCTION_ARN') | |
logs.put_subscription_filter( | |
logGroupName=group, | |
filterName='logstash', | |
filterPattern='', | |
destinationArn=sub_arn | |
) | |
def handler(event, context): | |
try: | |
log_group = event['detail']['requestParameters']['logGroupName'] | |
except Exception as e: | |
print(e) | |
print(json.dumps(event)) | |
if log_group.startswith('/aws/lambda'): | |
for gg in good_groups: | |
if log_group.startswith(gg): | |
create_subscription(log_group) | |
break | |
if __name__ == '__main__': | |
handler(None, None) | |
logCreateEventPermission: | |
Type: AWS::Lambda::Permission | |
Properties: | |
FunctionName: !Ref updateLogGroupFunction | |
Action: lambda:InvokeFunction | |
Principal: events.amazonaws.com | |
SourceArn: !GetAtt logCreateEvent.Arn | |
logCreateEvent: | |
Type: AWS::Events::Rule | |
Properties: | |
Description: Triggers logstash subscription on new log groups | |
State: ENABLED | |
Targets: | |
- Arn: !GetAtt updateLogGroupFunction.Arn | |
Id: updateLogGroupFunction | |
EventPattern: | | |
{ | |
"source": [ | |
"aws.logs" | |
], | |
"detail-type": [ | |
"AWS API Call via CloudTrail" | |
], | |
"detail": { | |
"eventSource": [ | |
"logs.amazonaws.com" | |
], | |
"eventName": [ | |
"CreateLogGroup" | |
] | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment