Last active
October 26, 2018 11:04
-
-
Save h-mochizuki/66c648324478e4808eb9c1f84c31350f to your computer and use it in GitHub Desktop.
AWSからログを取得するスクリプト
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| import boto3 | |
| from datetime import datetime | |
| def datetime2timestamp(dt: datetime) -> int: | |
| return int(dt.timestamp() * 1000) | |
| def timestamp2datetime(ts: int) -> datetime: | |
| return datetime.fromtimestamp(ts / 1000) | |
| class LogAccessor: | |
| def __init__( | |
| self, | |
| region_name: str = None, | |
| aws_access_key_id: str = None, | |
| aws_secret_access_key: str = None | |
| ): | |
| kwargs = {} | |
| if region_name: | |
| kwargs['region_name'] = region_name | |
| if aws_access_key_id: | |
| kwargs['aws_access_key_id'] = aws_access_key_id | |
| if aws_secret_access_key: | |
| kwargs['aws_secret_access_key'] = aws_secret_access_key | |
| if kwargs: | |
| self.client = boto3.client('logs', **kwargs) | |
| else: | |
| self.client = boto3.client('logs') | |
| def log_groups( | |
| self, | |
| name_prefix: str = None, | |
| limit=None, | |
| *, | |
| next_token: str = None | |
| ): | |
| params = {} | |
| if name_prefix: | |
| params['logGroupNamePrefix'] = name_prefix | |
| if isinstance(limit, int): | |
| params['limit'] = limit if 0 < limit < 50 else 1 if 1 < limit else 50 | |
| if next_token: | |
| params['nextToken'] = next_token | |
| data = self.client.describe_log_groups(**params) | |
| log_groups = data.get('logGroups') if 'logGroups' in data else [] | |
| token = data.get('nextToken') if 'nextToken' in data else None | |
| for log_group in log_groups: | |
| yield LogGroup(log_group, self) | |
| if token and limit is None: | |
| return self.log_groups( | |
| name_prefix=name_prefix, | |
| limit=limit, | |
| next_token=token | |
| ) | |
| def log_streams( | |
| self, | |
| log_group: str, | |
| name_prefix: str = None, | |
| order_by: str = 'LastEventTime', | |
| descending: bool = False, | |
| limit=None, | |
| *, | |
| next_token: str = None | |
| ): | |
| params = {} | |
| if log_group: | |
| params['logGroupName'] = log_group | |
| if name_prefix: | |
| params['logStreamNamePrefix'] = name_prefix | |
| if order_by: | |
| params['orderBy'] = order_by | |
| if descending: | |
| params['descending'] = descending | |
| if isinstance(limit, int): | |
| params['limit'] = limit if 0 < limit else 1 | |
| if next_token: | |
| params['nextToken'] = next_token | |
| data = self.client.describe_log_streams(**params) | |
| log_streams = data.get('logStreams') if 'logStreams' in data else [] | |
| token = data.get('nextToken') if 'nextToken' in data else None | |
| for log_stream in log_streams: | |
| yield LogStream(log_group, log_stream, self) | |
| if token and limit is None: | |
| return self.log_streams( | |
| log_group=log_group, | |
| name_prefix=name_prefix, | |
| order_by=order_by, | |
| descending=descending, | |
| limit=limit, | |
| next_token=token | |
| ) | |
| def log_events( | |
| self, | |
| log_group: str, | |
| log_stream: str, | |
| start_time=None, | |
| end_time=None, | |
| limit=None, | |
| start_from_head: bool = False, | |
| *, | |
| next_token: str = None | |
| ): | |
| params = {} | |
| if log_group: | |
| params['logGroupName'] = log_group | |
| if log_stream: | |
| params['logStreamName'] = log_stream | |
| if isinstance(start_time, int): | |
| params['startTime'] = start_time if 0 < start_time else 1 | |
| if isinstance(end_time, int): | |
| params['endTime'] = end_time if 0 < end_time else 1 | |
| if isinstance(limit, int): | |
| params['limit'] = limit if 0 < limit else 1 | |
| if start_from_head: | |
| params['startFromHead'] = start_from_head | |
| if next_token: | |
| params['nextToken'] = next_token | |
| data = self.client.get_log_events(**params) | |
| events = data.get('events') if 'events' in data else [] | |
| for event in events: | |
| yield LogEvent(log_group, log_stream, event, self) | |
| class AutoMapping: | |
| def __init__(self, mapping: dict = {}, accessor: LogAccessor = None, *args, **kwargs): | |
| self.mapping = mapping | |
| self.accessor = accessor if accessor else LogAccessor() | |
| for k, v in mapping.items(): | |
| if hasattr(self, k): | |
| try: | |
| setattr(self, k, v) | |
| except AttributeError: | |
| pass | |
| def __getattr__(self, key: str): | |
| if key.endswith('_by_strftime') and hasattr(self, key[0:-12]): | |
| return getattr(self, '{}_to_datetime'.format(key[0:-12])).strftime | |
| if key.endswith('_to_datetime') and hasattr(self, key[0:-12]): | |
| _timestamp = getattr(self, key[0:-12]) | |
| if isinstance(_timestamp, int) and _timestamp > 0: | |
| return timestamp2datetime(_timestamp) | |
| return datetime.min | |
| raise AttributeError | |
| def __str__(self) -> str: | |
| return str(self.mapping) | |
| def named_format(self, form: str, time_format: str = "%Y-%m-%dT%H:%M:%S.%f"): | |
| param = {} | |
| for k, v in self.mapping.items(): | |
| param[k] = v | |
| try: | |
| key = k + "_to_datetime" | |
| param[key] = getattr(self, key) | |
| except AttributeError: | |
| pass | |
| try: | |
| key = k + "_by_strftime" | |
| param[key] = getattr(self, key)(time_format) | |
| except Exception: | |
| pass | |
| return form.format(**param) | |
| class LogGroup(AutoMapping): | |
| arn: str = None | |
| creationTime: int = None | |
| kmsKeyId: int = None | |
| logGroupName: str = None | |
| metricFilterCount: int = 0 | |
| retentionInDays: int = 0 | |
| storedBytes: int = 0 | |
| def log_streams( | |
| self, | |
| name_prefix: str = None, | |
| order_by: str = 'LastEventTime', | |
| descending: bool = False, | |
| limit=None, | |
| ): | |
| return self.accessor.log_streams( | |
| log_group=self.logGroupName, | |
| name_prefix=name_prefix, | |
| order_by=order_by, | |
| descending=descending, | |
| limit=limit | |
| ) | |
| class LogStream(AutoMapping): | |
| logGroupName: str = None | |
| logStreamName: str = None | |
| creationTime: int = 0 | |
| firstEventTimestamp: int = 0 | |
| lastEventTimestamp: int = 0 | |
| lastIngestionTime: int = 0 | |
| uploadSequenceToken: str = None | |
| arn: str = None | |
| storedBytes: int = 0 | |
| def __init__(self, log_group: str, data: dict = {}, accessor: LogAccessor = None): | |
| super().__init__(mapping={ | |
| 'logGroupName': log_group, | |
| **data | |
| }, accessor=accessor) | |
| def log_events( | |
| self, | |
| start_time=None, | |
| end_time=None, | |
| limit=None, | |
| start_from_head: bool = False, | |
| *, next_token: str = None | |
| ): | |
| return self.accessor.log_events( | |
| log_group=self.logGroupName, | |
| log_stream=self.logStreamName, | |
| start_time=start_time, | |
| end_time=end_time, | |
| limit=limit, | |
| start_from_head=start_from_head | |
| ) | |
| class LogEvent(AutoMapping): | |
| logGroupName: str = None | |
| logStreamName: str = None | |
| message: str = None | |
| timestamp: int = 0 | |
| ingestionTime: int = 0 | |
| def __init__(self, log_group: str, log_stream: str, data: dict = {}, accessor: LogAccessor = None): | |
| super().__init__(mapping={ | |
| 'logGroupName': log_group, | |
| 'logStreamName': log_stream, | |
| **data | |
| }, accessor=accessor) | |
| if __name__ == "__main__": | |
| from datetime import timedelta | |
| # 一時間以内のログを取得 | |
| start_time = datetime.now() - timedelta(hours=1) | |
| for g in LogAccessor().log_groups(): | |
| for s in [x for x in g.log_streams() if start_time < x.lastEventTimestamp_to_datetime]: | |
| for e in [x for x in s.log_events() if start_time < x.timestamp_to_datetime]: | |
| print(e.named_format("{message}")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment