Last active
July 23, 2020 08:53
-
-
Save aleskiontherun/5e3aeeae2f78057f171828d7bc2d238a to your computer and use it in GitHub Desktop.
Copy existing logs from CloudWatch to Elasticsearch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import boto3 | |
import gzip | |
import base64 | |
import time | |
# AWS account ID | |
AWS_ACCOUNT_ID = "001234567890" | |
# CloudWatch log group name | |
CW_GROUP = 'my-logs' | |
# CloudWatch log stream names, set to None to copy from all streams | |
CW_STREAMS = ['my-stream'] | |
# Lambda function name https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_ES_Stream.html | |
LAMBDA_NAME = 'LogsToElasticsearch' | |
# Copy logs from this timestamp in milliseconds | |
START_TIME = 1595030400000 | |
# Copy logs up to this timestamp in milliseconds | |
END_TIME = 1595116799999 | |
# Number of messages to copy in one batch | |
BATCH_SIZE = 1000 | |
# Number of seconds to wait between batches | |
BATCH_INTERVAL = 1.0 | |
# Execute without actually sending events to ES | |
DRY_RUN = False | |
logs_client = boto3.client('logs') | |
lambda_client = boto3.client('lambda') | |
def get_streams(): | |
token = None | |
while True: | |
response = get_streams_batch(token) | |
for row in response['logStreams']: | |
yield row['logStreamName'] | |
if 'nextToken' in response: | |
token = response['nextToken'] | |
else: | |
break | |
def get_streams_batch(next_token=None): | |
if next_token is None: | |
return logs_client.describe_log_streams( | |
logGroupName=CW_GROUP, | |
orderBy='LogStreamName', | |
) | |
return logs_client.describe_log_streams( | |
logGroupName=CW_GROUP, | |
orderBy='LogStreamName', | |
nextToken=next_token, | |
) | |
def get_events(stream_name): | |
token = None | |
while True: | |
response = get_events_batch(stream_name, token) | |
if len(response['events']) == 0: | |
break | |
for row in response['events']: | |
yield row | |
if 'nextForwardToken' in response: | |
token = response['nextForwardToken'] | |
else: | |
break | |
def get_events_batch(stream_name, next_token=None): | |
if next_token is None: | |
return logs_client.get_log_events( | |
logGroupName=CW_GROUP, | |
logStreamName=stream_name, | |
startTime=START_TIME, | |
endTime=END_TIME, | |
startFromHead=True, | |
) | |
return logs_client.get_log_events( | |
logGroupName=CW_GROUP, | |
logStreamName=stream_name, | |
nextToken=next_token, | |
) | |
if CW_STREAMS in None: | |
CW_STREAMS = get_streams() | |
t = 1 | |
i = 0 | |
size = 10 | |
events = [] | |
for stream_name in CW_STREAMS: | |
for event in get_events(stream_name): | |
event_id = str(event['timestamp']) + str(i).zfill(20) | |
events.append({ | |
'id': event_id, | |
'timestamp': event['timestamp'], | |
'message': event['message'], | |
}) | |
i += 1 | |
if len(events) == BATCH_SIZE: | |
payload = { | |
"awslogs": { | |
"data": base64.b64encode(gzip.compress(json.dumps({ | |
"messageType": "DATA_MESSAGE", | |
"owner": AWS_ACCOUNT_ID, | |
"logGroup": CW_GROUP, | |
"logStream": CW_STREAMS, | |
"subscriptionFilters": [], | |
"logEvents": events, | |
}).encode('utf-8'))).decode('utf-8') | |
} | |
} | |
if not DRY_RUN: | |
lambda_client.invoke( | |
FunctionName=LAMBDA_NAME, | |
InvocationType='Event', | |
LogType='None', | |
Payload=str.encode(json.dumps(payload), 'utf-8') | |
) | |
print('sent ' + event_id) | |
# print(json.dumps(events, indent=4)) | |
events = [] | |
if BATCH_INTERVAL > 0: | |
time.sleep(BATCH_INTERVAL) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment