-
-
Save nicornk/987bba7d1fe3e1f7ebcbc1648829d3f0 to your computer and use it in GitHub Desktop.
Boto3 Tail AWS Cloudwatch Logs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Using boto3, instead of awscli, to tail logs. | |
Based on https://github.com/aws/aws-cli/blob/v2/awscli/customizations/logs/tail.py | |
and | |
https://gist.github.com/alanyee/601f995bfd6acfd4c3c16ee7e9115ab5 | |
""" | |
from collections import defaultdict | |
import time | |
import re | |
from botocore.utils import parse_timestamp, datetime2timestamp | |
from datetime import datetime, timedelta | |
import boto3 | |
from botocore.exceptions import ClientError, ParamValidationError | |
from typing import Generator | |
SLEEP = 5 | |
class TimestampUtils(object): | |
_RELATIVE_TIMESTAMP_REGEX = re.compile(r"(?P<amount>\d+)(?P<unit>s|m|h|d|w)$") | |
_TO_SECONDS = { | |
"s": 1, | |
"m": 60, | |
"h": 3600, | |
"d": 24 * 3600, | |
"w": 7 * 24 * 3600, | |
} | |
def __init__(self, now=None): | |
self._now = now | |
if now is None: | |
self._now = datetime.utcnow | |
def to_epoch_millis(self, timestamp): | |
re_match = self._RELATIVE_TIMESTAMP_REGEX.match(timestamp) | |
if re_match: | |
datetime_value = self._relative_timestamp_to_datetime( | |
int(re_match.group("amount")), re_match.group("unit") | |
) | |
else: | |
datetime_value = parse_timestamp(timestamp) | |
return int(datetime2timestamp(datetime_value) * 1000) | |
def to_datetime(self, millis_timestamp): | |
return datetime.fromtimestamp(millis_timestamp / 1000.0, tz.tzutc()) | |
def _relative_timestamp_to_datetime(self, amount, unit): | |
multiplier = self._TO_SECONDS[unit] | |
return self._now() + timedelta(seconds=amount * multiplier * -1) | |
class LogStreamTailer: | |
def __init__(self, logs_client, log_group_name, since=None): | |
self.client = logs_client | |
# Passed to boto3 logs client filter_log_events | |
self.filter_logs_events_kwargs = { | |
"logGroupName": log_group_name, | |
# "logStreamNames": [], # list, | |
"interleaved": True, | |
"startTime": TimestampUtils().to_epoch_millis(since), | |
} | |
def tail(self) -> Generator[dict, None, None]: | |
return self._filter_log_events(self.filter_logs_events_kwargs) | |
def _filter_log_events(self, filter_logs_events_kwargs): | |
try: | |
for event in self._do_filter_log_events(filter_logs_events_kwargs): | |
yield event | |
except KeyboardInterrupt: | |
# The only way to exit from the --follow is to Ctrl-C. So | |
# we should exit the iterator rather than having the | |
# KeyboardInterrupt propogate to the rest of the command. | |
return | |
def _get_latest_events_and_timestamp(self, event_ids_per_timestamp): | |
if event_ids_per_timestamp: | |
# Keep only ids of the events with the newest timestamp | |
newest_timestamp = max(event_ids_per_timestamp.keys()) | |
event_ids_per_timestamp = defaultdict( | |
set, {newest_timestamp: event_ids_per_timestamp[newest_timestamp]} | |
) | |
return event_ids_per_timestamp | |
def _reset_filter_log_events_params(self, fle_kwargs, event_ids_per_timestamp): | |
# Remove nextToken and update startTime for the next request | |
# with the timestamp of the newest event | |
if event_ids_per_timestamp: | |
fle_kwargs["startTime"] = max(event_ids_per_timestamp.keys()) | |
fle_kwargs.pop("nextToken", None) | |
def _do_filter_log_events(self, filter_logs_events_kwargs): | |
event_ids_per_timestamp = defaultdict(set) | |
while True: | |
try: | |
response = self.client.filter_log_events(**filter_logs_events_kwargs) | |
except (ClientError, ParamValidationError) as exc: | |
raise exc | |
for event in response["events"]: | |
# For the case where we've hit the last page, we will be | |
# reusing the newest timestamp of the received events to keep polling. | |
# This means it is possible that duplicate log events with same timestamp | |
# are returned back which we do not want to yield again. | |
# We only want to yield log events that we have not seen. | |
if event["eventId"] not in event_ids_per_timestamp[event["timestamp"]]: | |
event_ids_per_timestamp[event["timestamp"]].add(event["eventId"]) | |
yield event | |
event_ids_per_timestamp = self._get_latest_events_and_timestamp( | |
event_ids_per_timestamp | |
) | |
if "nextToken" in response: | |
filter_logs_events_kwargs["nextToken"] = response["nextToken"] | |
else: | |
self._reset_filter_log_events_params( | |
filter_logs_events_kwargs, event_ids_per_timestamp | |
) | |
time.sleep(SLEEP) | |
if __name__ == "__main__": | |
client = boto3.client( | |
"logs" | |
) | |
tailer = LogStreamTailer( | |
logs_client=client, | |
log_group_name="logGroup1", | |
since="10m", | |
) | |
for event in tailer.tail(): | |
print(event) | |
if "limit 1337" in event["message"]: | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment