Skip to content

Instantly share code, notes, and snippets.

@klokan
Created October 30, 2015 18:14
Show Gist options
  • Select an option

  • Save klokan/64671924f7a5c9c602c2 to your computer and use it in GitHub Desktop.

Select an option

Save klokan/64671924f7a5c9c602c2 to your computer and use it in GitHub Desktop.
CloudWatch Logs - boto python logging
diff --git a/XXX-tiler.py b/XXX-tiler.py
index b18d99d..9a01a53 100644
--- a/XXX-tiler.py
+++ b/XXX-tiler.py
@@ -1,5 +1,6 @@
import json
import logging
+import logging.handlers
import requests
import sqlite3
@@ -18,6 +19,7 @@
from xml.etree import ElementTree
from zipfile import ZipFile
+from cloudwrapper.cloudwatchlogs import CloudWatchLogsConnection
from cloudwrapper.s3 import S3Connection
from cloudwrapper.sqs import SqsConnection
@@ -46,21 +48,30 @@ def main():
publish_parser.set_defaults(handler=publish)
args = main_parser.parse_args()
- logging.basicConfig(
- filename=args.log,
- format='%(asctime)s:%(levelname)s: %(message)s',
- level=logging.INFO)
+
+ formatter = logging.Formatter('%(asctime)s:%(levelname)s: %(message)s')
+ handler = logging.handlers.FileHandler(args.log)
+ handler.setFormatter(formatter)
+ logger = logging.getLogger()
+ logger.setLevel(logging.INFO)
+ logger.addHandler(handler)
+
logging.info('starting')
try:
logging.info('reading configuration file %s', args.config)
with open(args.config, 'rb') as file_:
config = json.load(file_)
+ cloudwatch_logs = CloudWatchLogsConnection(**config['aws'])
+ handler = cloudwatch_logs.handler('airbus-tiler', args.handler.__name__)
+ handler.setLevel(logging.INFO)
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
args.handler(args, config)
- except KeyboardInterrupt:
- logging.info('terminating')
except Exception:
logging.exception('exception occured')
exit(1)
+ finally:
+ logging.info('terminating')
def discover(args, config):
@@ -89,6 +100,7 @@ def discover(args, config):
conn.commit()
tasked.remove(source_id)
+ flush_log()
conn.close()
@@ -125,6 +137,7 @@ def publish(args, config):
conn.commit()
queue.task_done()
+ flush_log()
conn.close()
@@ -159,6 +172,7 @@ def prepare(args, config):
'satellite': satellite,
})
tasks.task_done()
+ flush_log()
def tile(args, config):
@@ -188,6 +202,7 @@ def tile(args, config):
feature['properties']['tilingDate'] = timestamp
results.put(feature)
tasks.task_done()
+ flush_log()
class Schedule(object):
@@ -515,5 +530,10 @@ def download(bucket, listing):
bucket.get(member, member)
+def flush_log():
+ for handler in logging.getLogger().handlers:
+ handler.flush()
+
+
if __name__ == '__main__':
main()
diff --git a/cloudwrapper/cloudwatchlogs.py b/cloudwrapper/cloudwatchlogs.py
new file mode 100644
index 0000000..3ebd1ec
--- /dev/null
+++ b/cloudwrapper/cloudwatchlogs.py
@@ -0,0 +1,58 @@
+"""Amazon CloudWatch Logs."""
+
+import logging
+
+from boto.logs import connect_to_region
+from boto.logs.exceptions import \
+ InvalidSequenceTokenException, \
+ ResourceAlreadyExistsException
+
+
+class CloudWatchLogsConnection(object):
+
+ def __init__(self, region, key=None, secret=None):
+ self.connection = connect_to_region(
+ region,
+ aws_access_key_id=key,
+ aws_secret_access_key=secret)
+
+ def handler(self, group, stream):
+ try:
+ self.connection.create_log_group(group)
+ except ResourceAlreadyExistsException:
+ pass
+ try:
+ self.connection.create_log_stream(group, stream)
+ except ResourceAlreadyExistsException:
+ pass
+ return Handler(self.connection, group, stream)
+
+
+class Handler(logging.Handler):
+
+ def __init__(self, connection, group, stream, *args, **kwargs):
+ super(Handler, self).__init__(*args, **kwargs)
+ self.connection = connection
+ self.group = group
+ self.stream = stream
+ self.token = None
+ self.events = []
+
+ def emit(self, record):
+ self.events.append({
+ 'timestamp': int(record.created * 1000),
+ 'message': self.format(record),
+ })
+
+ def flush(self):
+ if not self.events:
+ return
+ try:
+ response = self.connection.put_log_events(
+ self.group, self.stream, self.events, self.token)
+ except InvalidSequenceTokenException as ex:
+ self.token = ex.body['expectedSequenceToken']
+ response = self.connection.put_log_events(
+ self.group, self.stream, self.events, self.token)
+ self.token = response['nextSequenceToken']
+ self.events = []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment