Skip to content

Instantly share code, notes, and snippets.

@eastside
Created October 25, 2021 00:21
Show Gist options
  • Save eastside/7014561a5ebca19616012d49170eb3b3 to your computer and use it in GitHub Desktop.
Save eastside/7014561a5ebca19616012d49170eb3b3 to your computer and use it in GitHub Desktop.
Create or Stream to an AWS CloudFormation Log Group/Stream
import json
import typing
from datetime import datetime
import boto3
from botocore.exceptions import ClientError
def create_or_stream_to_log(
aws_cli_logs,
log_group_name: str,
stream_name: str,
msg_list: typing.Tuple[str],
next_sequence_token: typing.Optional[str]=None,
):
"""
Given a log group name, or stream name, and adds the given messages as log lines.
Creates the log group or stream if it doesn't already exists.
(This is surprisingly tricky!)
"""
stream_name = str(stream_name)
log_events = [
{
'timestamp': int(round(datetime.now().timestamp() * 1000)),
'message': str(log_msg),
} for log_msg in msg_list
]
def _put_logs(next_sequence_token):
logEventsResponse = aws_cli_logs.put_log_events(
logGroupName=log_group_name,
logStreamName=stream_name,
logEvents=log_events,
**({'sequenceToken': next_sequence_token} if next_sequence_token else {})
)
return logEventsResponse['nextSequenceToken']
try:
return _put_logs(next_sequence_token)
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
# The log group or stream doesn't exist, so try to create the missing pieces then try again
try:
aws_cli_logs.create_log_group(logGroupName=log_group_name)
except ClientError as e:
if e.response['Error']['Code'] != 'ResourceAlreadyExistsException':
raise
else:
# Was created, so put a sane retention policy in...
aws_cli_logs.put_retention_policy(logGroupName=log_group_name, retentionInDays=14)
aws_cli_logs.create_log_stream(logGroupName=log_group_name, logStreamName=stream_name)
# If we JUST created the log stream, we don't need a next token for some reason
return _put_logs(None)
# In the case of an invalid sequence token for some reason (maybe dual attempts to
# log to the same stream), we'll just grab right one in a second
elif e.response['Error']['Code'] == 'InvalidSequenceTokenException':
# Get the correct sequence token from the exception message
_, next_sequence_token = e.response['Error']['Message'].split("The next expected sequenceToken is: ")
# This seems to sometimes trigger in a loop, possibly due to a race condition. Let me just say,
# I think requiring a sequence token is stupid. I wish there was a way I could say, whenever you
# get this log line, just put it in the log, I don't care about order.
return create_or_stream_to_log(aws_cli_logs,
log_group_name,
stream_name,
msg_list,
next_sequence_token)
else:
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment