Created
October 24, 2013 01:11
-
-
Save klizhentas/7129659 to your computer and use it in GitHub Desktop.
Aggregator that collects events from keen.io and flushes them in batches
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Aggregates events in batches and flushes them after max batch size | |
have been achieved or certain amount of seconds has passed | |
""" | |
import logging | |
from threading import Lock | |
from datetime import datetime | |
log = logging.getLogger(__name__) | |
FLUSH_SECONDS = 3 | |
BATCH_SIZE = 100 | |
aggregator = None | |
def init(): | |
global aggregator | |
aggregator = Aggregator() | |
class Aggregator(object): | |
def __init__(self): | |
self.lock = Lock() | |
self.events = {} | |
def add(self, event_name, event): | |
events = self.events.get(event_name) | |
if not events: | |
events = _Events() | |
self.events[event_name] = events | |
events.add(event) | |
def flush(self): | |
to_flush = {} | |
for event_name, events in self.events.iteritems(): | |
if events.should_flush(): | |
to_flush[event_name] = events.flush() | |
return to_flush | |
def unflush(self, values): | |
for event_name, events in values.iteritems(): | |
for event in events: | |
self.add(event_name, event) | |
class _Events(object): | |
def __init__(self): | |
self.last_flush = _now() | |
self.items = [] | |
def should_flush(self): | |
if len(self.items) >= BATCH_SIZE: | |
log.info("Should flush items size: {}".format(len(self.items))) | |
return True | |
else: | |
log.info("Should not flush items size: {}".format(len(self.items))) | |
now = _now() | |
diff_seconds = (now - self.last_flush).total_seconds() | |
if diff_seconds > FLUSH_SECONDS: | |
log.info("Should flush diff_seconds: {}".format(diff_seconds)) | |
return True | |
else: | |
log.info("Should not flush diff_seconds: {}".format(diff_seconds)) | |
return False | |
def add(self, event): | |
self.items.append(event) | |
def flush(self): | |
items = self.items | |
self.items = [] | |
self.last_flush = _now() | |
return items | |
def add(event_name, event): | |
global aggregator | |
with aggregator.lock: | |
aggregator.add(event_name, event) | |
def flush(): | |
global aggregator | |
with aggregator.lock: | |
return aggregator.flush() | |
def unflush(values): | |
with aggregator.lock: | |
return aggregator.unflush(values) | |
def _now(): | |
return datetime.utcnow() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
from nose.tools import eq_, ok_ | |
from mock import patch | |
from keend import aggregate | |
@patch.object(aggregate, '_now') | |
def test_add_when_time_is_out(get_now): | |
now = datetime(2010, 1, 1, 2, 3, 4, 0) | |
get_now.return_value = now | |
aggregate.add("clicked", {"e": 1}) | |
aggregate.add("clicked", {"e": 2}) | |
aggregate.add("opened", {"e": 3}) | |
aggregate.add("opened", {"e": 4}) | |
get_now.return_value = now + timedelta(seconds=1) | |
values = aggregate.flush() | |
eq_(values, {}) | |
get_now.return_value = now + timedelta( | |
seconds=aggregate.FLUSH_SECONDS + 1) | |
values = aggregate.flush() | |
expected = { | |
'clicked': [{'e': 1}, {'e': 2}], | |
'opened': [{'e': 3}, {'e': 4}], | |
} | |
eq_(values, expected) | |
# make sure that second flush does not include events | |
# that are already flushed | |
aggregate.add("clicked", {"e": 5}) | |
aggregate.add("clicked", {"e": 6}) | |
aggregate.add("opened", {"e": 7}) | |
aggregate.add("opened", {"e": 8}) | |
get_now.return_value = now + timedelta( | |
seconds=aggregate.FLUSH_SECONDS *2 + 2) | |
values = aggregate.flush() | |
expected = { | |
'clicked': [{'e': 5}, {'e': 6}], | |
'opened': [{'e': 7}, {'e': 8}], | |
} | |
eq_(values, expected) | |
@patch.object(aggregate, '_now') | |
@patch.object(aggregate, 'BATCH_SIZE', 3) | |
def test_add_batch_size_exceeded(get_now): | |
now = datetime(2010, 1, 1, 2, 3, 4, 0) | |
get_now.return_value = now | |
aggregate.add("clicked", {"e": 1}) | |
aggregate.add("clicked", {"e": 2}) | |
aggregate.add("opened", {"e": 3}) | |
aggregate.add("opened", {"e": 4}) | |
values = aggregate.flush() | |
eq_(values, {}) | |
aggregate.add("clicked", {"e": 5}) | |
values = aggregate.flush() | |
expected = { | |
'clicked': [{'e': 1}, {'e': 2}, {'e': 5}], | |
} | |
eq_(values, expected) | |
# make sure it won't include events that are already flushed | |
aggregate.add("opened", {"e": 6}) | |
values = aggregate.flush() | |
expected = { | |
'opened': [{'e': 3}, {'e': 4}, {'e': 6}], | |
} | |
eq_(values, expected) | |
@patch.object(aggregate, '_now') | |
@patch.object(aggregate, 'BATCH_SIZE', 3) | |
def test_unflush(get_now): | |
now = datetime(2010, 1, 1, 2, 3, 4, 0) | |
get_now.return_value = now | |
aggregate.add("clicked", {"e": 1}) | |
aggregate.add("clicked", {"e": 2}) | |
aggregate.add("clicked", {"e": 3}) | |
aggregate.add("opened", {"e": 4}) | |
aggregate.add("opened", {"e": 5}) | |
aggregate.add("opened", {"e": 6}) | |
aggregate.add("opened", {"e": 7}) | |
values = aggregate.flush() | |
ok_(values) | |
aggregate.unflush(values) | |
values2 = aggregate.flush() | |
eq_(values, values2) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import keend | |
from keend import aggregate | |
from keend import timeutils | |
log = logging.getLogger(__name__) | |
def on_event(event): | |
try: | |
event['keen'] = { | |
'timestamp': timeutils.convert_timestamp(event['timestamp']) | |
} | |
aggregate.add(event['event'], event) | |
flush = aggregate.flush() | |
if flush: | |
log.info("Flushing events") | |
try: | |
keend.client.add_events(flush) | |
except Exception: | |
log.exception("Failed to flush, unflushing!") | |
aggregate.unflush(flush) | |
else: | |
log.info("Not flushing") | |
except Exception: | |
log.exception("Failure in event handler!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment