Skip to content

Instantly share code, notes, and snippets.

@h-mochizuki
Last active October 26, 2018 11:04
Show Gist options
  • Select an option

  • Save h-mochizuki/66c648324478e4808eb9c1f84c31350f to your computer and use it in GitHub Desktop.

Select an option

Save h-mochizuki/66c648324478e4808eb9c1f84c31350f to your computer and use it in GitHub Desktop.
AWSからログを取得するスクリプト
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import boto3
from datetime import datetime
def datetime2timestamp(dt: datetime) -> int:
return int(dt.timestamp() * 1000)
def timestamp2datetime(ts: int) -> datetime:
return datetime.fromtimestamp(ts / 1000)
class LogAccessor:
def __init__(
self,
region_name: str = None,
aws_access_key_id: str = None,
aws_secret_access_key: str = None
):
kwargs = {}
if region_name:
kwargs['region_name'] = region_name
if aws_access_key_id:
kwargs['aws_access_key_id'] = aws_access_key_id
if aws_secret_access_key:
kwargs['aws_secret_access_key'] = aws_secret_access_key
if kwargs:
self.client = boto3.client('logs', **kwargs)
else:
self.client = boto3.client('logs')
def log_groups(
self,
name_prefix: str = None,
limit=None,
*,
next_token: str = None
):
params = {}
if name_prefix:
params['logGroupNamePrefix'] = name_prefix
if isinstance(limit, int):
params['limit'] = limit if 0 < limit < 50 else 1 if 1 < limit else 50
if next_token:
params['nextToken'] = next_token
data = self.client.describe_log_groups(**params)
log_groups = data.get('logGroups') if 'logGroups' in data else []
token = data.get('nextToken') if 'nextToken' in data else None
for log_group in log_groups:
yield LogGroup(log_group, self)
if token and limit is None:
return self.log_groups(
name_prefix=name_prefix,
limit=limit,
next_token=token
)
def log_streams(
self,
log_group: str,
name_prefix: str = None,
order_by: str = 'LastEventTime',
descending: bool = False,
limit=None,
*,
next_token: str = None
):
params = {}
if log_group:
params['logGroupName'] = log_group
if name_prefix:
params['logStreamNamePrefix'] = name_prefix
if order_by:
params['orderBy'] = order_by
if descending:
params['descending'] = descending
if isinstance(limit, int):
params['limit'] = limit if 0 < limit else 1
if next_token:
params['nextToken'] = next_token
data = self.client.describe_log_streams(**params)
log_streams = data.get('logStreams') if 'logStreams' in data else []
token = data.get('nextToken') if 'nextToken' in data else None
for log_stream in log_streams:
yield LogStream(log_group, log_stream, self)
if token and limit is None:
return self.log_streams(
log_group=log_group,
name_prefix=name_prefix,
order_by=order_by,
descending=descending,
limit=limit,
next_token=token
)
def log_events(
self,
log_group: str,
log_stream: str,
start_time=None,
end_time=None,
limit=None,
start_from_head: bool = False,
*,
next_token: str = None
):
params = {}
if log_group:
params['logGroupName'] = log_group
if log_stream:
params['logStreamName'] = log_stream
if isinstance(start_time, int):
params['startTime'] = start_time if 0 < start_time else 1
if isinstance(end_time, int):
params['endTime'] = end_time if 0 < end_time else 1
if isinstance(limit, int):
params['limit'] = limit if 0 < limit else 1
if start_from_head:
params['startFromHead'] = start_from_head
if next_token:
params['nextToken'] = next_token
data = self.client.get_log_events(**params)
events = data.get('events') if 'events' in data else []
for event in events:
yield LogEvent(log_group, log_stream, event, self)
class AutoMapping:
def __init__(self, mapping: dict = {}, accessor: LogAccessor = None, *args, **kwargs):
self.mapping = mapping
self.accessor = accessor if accessor else LogAccessor()
for k, v in mapping.items():
if hasattr(self, k):
try:
setattr(self, k, v)
except AttributeError:
pass
def __getattr__(self, key: str):
if key.endswith('_by_strftime') and hasattr(self, key[0:-12]):
return getattr(self, '{}_to_datetime'.format(key[0:-12])).strftime
if key.endswith('_to_datetime') and hasattr(self, key[0:-12]):
_timestamp = getattr(self, key[0:-12])
if isinstance(_timestamp, int) and _timestamp > 0:
return timestamp2datetime(_timestamp)
return datetime.min
raise AttributeError
def __str__(self) -> str:
return str(self.mapping)
def named_format(self, form: str, time_format: str = "%Y-%m-%dT%H:%M:%S.%f"):
param = {}
for k, v in self.mapping.items():
param[k] = v
try:
key = k + "_to_datetime"
param[key] = getattr(self, key)
except AttributeError:
pass
try:
key = k + "_by_strftime"
param[key] = getattr(self, key)(time_format)
except Exception:
pass
return form.format(**param)
class LogGroup(AutoMapping):
arn: str = None
creationTime: int = None
kmsKeyId: int = None
logGroupName: str = None
metricFilterCount: int = 0
retentionInDays: int = 0
storedBytes: int = 0
def log_streams(
self,
name_prefix: str = None,
order_by: str = 'LastEventTime',
descending: bool = False,
limit=None,
):
return self.accessor.log_streams(
log_group=self.logGroupName,
name_prefix=name_prefix,
order_by=order_by,
descending=descending,
limit=limit
)
class LogStream(AutoMapping):
logGroupName: str = None
logStreamName: str = None
creationTime: int = 0
firstEventTimestamp: int = 0
lastEventTimestamp: int = 0
lastIngestionTime: int = 0
uploadSequenceToken: str = None
arn: str = None
storedBytes: int = 0
def __init__(self, log_group: str, data: dict = {}, accessor: LogAccessor = None):
super().__init__(mapping={
'logGroupName': log_group,
**data
}, accessor=accessor)
def log_events(
self,
start_time=None,
end_time=None,
limit=None,
start_from_head: bool = False,
*, next_token: str = None
):
return self.accessor.log_events(
log_group=self.logGroupName,
log_stream=self.logStreamName,
start_time=start_time,
end_time=end_time,
limit=limit,
start_from_head=start_from_head
)
class LogEvent(AutoMapping):
logGroupName: str = None
logStreamName: str = None
message: str = None
timestamp: int = 0
ingestionTime: int = 0
def __init__(self, log_group: str, log_stream: str, data: dict = {}, accessor: LogAccessor = None):
super().__init__(mapping={
'logGroupName': log_group,
'logStreamName': log_stream,
**data
}, accessor=accessor)
if __name__ == "__main__":
from datetime import timedelta
# 一時間以内のログを取得
start_time = datetime.now() - timedelta(hours=1)
for g in LogAccessor().log_groups():
for s in [x for x in g.log_streams() if start_time < x.lastEventTimestamp_to_datetime]:
for e in [x for x in s.log_events() if start_time < x.timestamp_to_datetime]:
print(e.named_format("{message}"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment