Skip to content

Instantly share code, notes, and snippets.

@nicornk
Forked from alanyee/tail.py
Last active July 7, 2022 07:43
Show Gist options
  • Save nicornk/987bba7d1fe3e1f7ebcbc1648829d3f0 to your computer and use it in GitHub Desktop.
Save nicornk/987bba7d1fe3e1f7ebcbc1648829d3f0 to your computer and use it in GitHub Desktop.
Boto3 Tail AWS Cloudwatch Logs
"""Using boto3, instead of awscli, to tail logs.
Based on https://github.com/aws/aws-cli/blob/v2/awscli/customizations/logs/tail.py
and
https://gist.github.com/alanyee/601f995bfd6acfd4c3c16ee7e9115ab5
"""
from collections import defaultdict
import time
import re
from botocore.utils import parse_timestamp, datetime2timestamp
from datetime import datetime, timedelta
import boto3
from botocore.exceptions import ClientError, ParamValidationError
from typing import Generator
SLEEP = 5
class TimestampUtils(object):
_RELATIVE_TIMESTAMP_REGEX = re.compile(r"(?P<amount>\d+)(?P<unit>s|m|h|d|w)$")
_TO_SECONDS = {
"s": 1,
"m": 60,
"h": 3600,
"d": 24 * 3600,
"w": 7 * 24 * 3600,
}
def __init__(self, now=None):
self._now = now
if now is None:
self._now = datetime.utcnow
def to_epoch_millis(self, timestamp):
re_match = self._RELATIVE_TIMESTAMP_REGEX.match(timestamp)
if re_match:
datetime_value = self._relative_timestamp_to_datetime(
int(re_match.group("amount")), re_match.group("unit")
)
else:
datetime_value = parse_timestamp(timestamp)
return int(datetime2timestamp(datetime_value) * 1000)
def to_datetime(self, millis_timestamp):
return datetime.fromtimestamp(millis_timestamp / 1000.0, tz.tzutc())
def _relative_timestamp_to_datetime(self, amount, unit):
multiplier = self._TO_SECONDS[unit]
return self._now() + timedelta(seconds=amount * multiplier * -1)
class LogStreamTailer:
def __init__(self, logs_client, log_group_name, since=None):
self.client = logs_client
# Passed to boto3 logs client filter_log_events
self.filter_logs_events_kwargs = {
"logGroupName": log_group_name,
# "logStreamNames": [], # list,
"interleaved": True,
"startTime": TimestampUtils().to_epoch_millis(since),
}
def tail(self) -> Generator[dict, None, None]:
return self._filter_log_events(self.filter_logs_events_kwargs)
def _filter_log_events(self, filter_logs_events_kwargs):
try:
for event in self._do_filter_log_events(filter_logs_events_kwargs):
yield event
except KeyboardInterrupt:
# The only way to exit from the --follow is to Ctrl-C. So
# we should exit the iterator rather than having the
# KeyboardInterrupt propogate to the rest of the command.
return
def _get_latest_events_and_timestamp(self, event_ids_per_timestamp):
if event_ids_per_timestamp:
# Keep only ids of the events with the newest timestamp
newest_timestamp = max(event_ids_per_timestamp.keys())
event_ids_per_timestamp = defaultdict(
set, {newest_timestamp: event_ids_per_timestamp[newest_timestamp]}
)
return event_ids_per_timestamp
def _reset_filter_log_events_params(self, fle_kwargs, event_ids_per_timestamp):
# Remove nextToken and update startTime for the next request
# with the timestamp of the newest event
if event_ids_per_timestamp:
fle_kwargs["startTime"] = max(event_ids_per_timestamp.keys())
fle_kwargs.pop("nextToken", None)
def _do_filter_log_events(self, filter_logs_events_kwargs):
event_ids_per_timestamp = defaultdict(set)
while True:
try:
response = self.client.filter_log_events(**filter_logs_events_kwargs)
except (ClientError, ParamValidationError) as exc:
raise exc
for event in response["events"]:
# For the case where we've hit the last page, we will be
# reusing the newest timestamp of the received events to keep polling.
# This means it is possible that duplicate log events with same timestamp
# are returned back which we do not want to yield again.
# We only want to yield log events that we have not seen.
if event["eventId"] not in event_ids_per_timestamp[event["timestamp"]]:
event_ids_per_timestamp[event["timestamp"]].add(event["eventId"])
yield event
event_ids_per_timestamp = self._get_latest_events_and_timestamp(
event_ids_per_timestamp
)
if "nextToken" in response:
filter_logs_events_kwargs["nextToken"] = response["nextToken"]
else:
self._reset_filter_log_events_params(
filter_logs_events_kwargs, event_ids_per_timestamp
)
time.sleep(SLEEP)
if __name__ == "__main__":
client = boto3.client(
"logs"
)
tailer = LogStreamTailer(
logs_client=client,
log_group_name="logGroup1",
since="10m",
)
for event in tailer.tail():
print(event)
if "limit 1337" in event["message"]:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment