Created
October 30, 2015 18:14
-
-
Save klokan/64671924f7a5c9c602c2 to your computer and use it in GitHub Desktop.
CloudWatch Logs - boto python logging
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/XXX-tiler.py b/XXX-tiler.py | |
| index b18d99d..9a01a53 100644 | |
| --- a/XXX-tiler.py | |
| +++ b/XXX-tiler.py | |
| @@ -1,5 +1,6 @@ | |
| import json | |
| import logging | |
| +import logging.handlers | |
| import requests | |
| import sqlite3 | |
| @@ -18,6 +19,7 @@ | |
| from xml.etree import ElementTree | |
| from zipfile import ZipFile | |
| +from cloudwrapper.cloudwatchlogs import CloudWatchLogsConnection | |
| from cloudwrapper.s3 import S3Connection | |
| from cloudwrapper.sqs import SqsConnection | |
| @@ -46,21 +48,30 @@ def main(): | |
| publish_parser.set_defaults(handler=publish) | |
| args = main_parser.parse_args() | |
| - logging.basicConfig( | |
| - filename=args.log, | |
| - format='%(asctime)s:%(levelname)s: %(message)s', | |
| - level=logging.INFO) | |
| + | |
| + formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s') | |
| + handler = logging.handlers.FileHandler(args.log) | |
| + handler.setFormatter(formatter) | |
| + logger = logging.getLogger() | |
| + logger.setLevel(logging.INFO) | |
| + logger.addHandler(handler) | |
| + | |
| logging.info('starting') | |
| try: | |
| logging.info('reading configuration file %s', args.config) | |
| with open(args.config, 'rb') as file_: | |
| config = json.load(file_) | |
| + cloudwatch_logs = CloudWatchLogsConnection(**config['aws']) | |
| + handler = cloudwatch_logs.handler('airbus-tiler', args.handler.__name__) | |
| + handler.setLevel(logging.INFO) | |
| + handler.setFormatter(formatter) | |
| + logger.addHandler(handler) | |
| args.handler(args, config) | |
| - except KeyboardInterrupt: | |
| - logging.info('terminating') | |
| except Exception: | |
| logging.exception('exception occured') | |
| exit(1) | |
| + finally: | |
| + logging.info('terminating') | |
| def discover(args, config): | |
| @@ -89,6 +100,7 @@ def discover(args, config): | |
| conn.commit() | |
| tasked.remove(source_id) | |
| + flush_log() | |
| conn.close() | |
| @@ -125,6 +137,7 @@ def publish(args, config): | |
| conn.commit() | |
| queue.task_done() | |
| + flush_log() | |
| conn.close() | |
| @@ -159,6 +172,7 @@ def prepare(args, config): | |
| 'satellite': satellite, | |
| }) | |
| tasks.task_done() | |
| + flush_log() | |
| def tile(args, config): | |
| @@ -188,6 +202,7 @@ def tile(args, config): | |
| feature['properties']['tilingDate'] = timestamp | |
| results.put(feature) | |
| tasks.task_done() | |
| + flush_log() | |
| class Schedule(object): | |
| @@ -515,5 +530,10 @@ def download(bucket, listing): | |
| bucket.get(member, member) | |
| +def flush_log(): | |
| + for handler in logging.getLogger().handlers: | |
| + handler.flush() | |
| + | |
| + | |
| if __name__ == '__main__': | |
| main() | |
| diff --git a/cloudwrapper/cloudwatchlogs.py b/cloudwrapper/cloudwatchlogs.py | |
| new file mode 100644 | |
| index 0000000..3ebd1ec | |
| --- /dev/null | |
| +++ b/cloudwrapper/cloudwatchlogs.py | |
| @@ -0,0 +1,58 @@ | |
| +"""Amazon CloudWatch Logs.""" | |
| + | |
| +import logging | |
| + | |
| +from boto.logs import connect_to_region | |
| +from boto.logs.exceptions import \ | |
| + InvalidSequenceTokenException, \ | |
| + ResourceAlreadyExistsException | |
| + | |
| + | |
| +class CloudWatchLogsConnection(object): | |
| + | |
| + def __init__(self, region, key=None, secret=None): | |
| + self.connection = connect_to_region( | |
| + region, | |
| + aws_access_key_id=key, | |
| + aws_secret_access_key=secret) | |
| + | |
| + def handler(self, group, stream): | |
| + try: | |
| + self.connection.create_log_group(group) | |
| + except ResourceAlreadyExistsException: | |
| + pass | |
| + try: | |
| + self.connection.create_log_stream(group, stream) | |
| + except ResourceAlreadyExistsException: | |
| + pass | |
| + return Handler(self.connection, group, stream) | |
| + | |
| + | |
| +class Handler(logging.Handler): | |
| + | |
| + def __init__(self, connection, group, stream, *args, **kwargs): | |
| + super(Handler, self).__init__(*args, **kwargs) | |
| + self.connection = connection | |
| + self.group = group | |
| + self.stream = stream | |
| + self.token = None | |
| + self.events = [] | |
| + | |
| + def emit(self, record): | |
| + self.events.append({ | |
| + 'timestamp': int(record.created * 1000), | |
| + 'message': self.format(record), | |
| + }) | |
| + | |
| + def flush(self): | |
| + if not self.events: | |
| + return | |
| + try: | |
| + response = self.connection.put_log_events( | |
| + self.group, self.stream, self.events, self.token) | |
| + except InvalidSequenceTokenException as ex: | |
| + self.token = ex.body['expectedSequenceToken'] | |
| + response = self.connection.put_log_events( | |
| + self.group, self.stream, self.events, self.token) | |
| + self.token = response['nextSequenceToken'] | |
| + self.events = [] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment