Created
July 15, 2013 02:05
-
-
Save jehiah/5997041 to your computer and use it in GitHub Desktop.
Buffered NSQ reader (python)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tornado.ioloop | |
import logging | |
import time | |
class BufferedMessageHandler(object): | |
""" | |
Usage: | |
buffered_handler = BufferedMessageHandler(my_handler) | |
r = Reader(message_handler=buffered_handler) | |
buffered_handler.reader = r | |
run() | |
This buffers messages and calls the handler with [(message, nsq_msg), (message, nsq_msg), ...] | |
""" | |
def __init__(self, message_handler, max_queue_time=25, io_loop=None): | |
assert callable(message_handler) | |
self.message_handler = message_handler | |
self.max_queue_time = max_queue_time | |
self.queue = [] | |
self.queue_start_time = None | |
self.io_loop = io_loop or tornado.ioloop.IOLoop.instance() | |
self.flush_timer = None | |
def __call__(self, message, nsq_msg): | |
nsq_msg.enable_async() | |
# start a timer to flush this batch even if we don't get more messages | |
if not self.queue: | |
self.queue_start_time = time.time() | |
deadline = time.time() + self.max_queue_time | |
self.flush_timer = self.io_loop.add_timeout(deadline, self.flush_callback) | |
self.queue.append((message, nsq_msg)) | |
if self.reader.is_starved(): | |
self.flush() | |
def flush_callback(self): | |
self.flush_timer = None | |
self.flush() | |
def flush(self): | |
if self.flush_timer: | |
self.io_loop.remove_timeout(self.flush_timer) | |
self.flush_timer = None | |
data = self.queue | |
if not data: | |
return | |
self.queue = [] | |
try: | |
self.message_handler(data) | |
except Exception: | |
logging.exception("uncaught exception proccessing messages. requeueing") | |
for message, nsq_msg in data: | |
nsq_msg.requeue() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment