Skip to content

Instantly share code, notes, and snippets.

@wheaties
Last active August 29, 2015 13:57
Show Gist options
  • Select an option

  • Save wheaties/9500264 to your computer and use it in GitHub Desktop.

Select an option

Save wheaties/9500264 to your computer and use it in GitHub Desktop.
Tornado and Cassandra Inserter Event Catcher
from tornado.web import RequestHandler
#Really not much to this. Json validation and parsing can be done using another library.
class EventRequestHandler(ReqestHandler):
SUPPORTED_METHODS = ['POST']
def initialize(self, inserter): #Really? By side-effect? Ugh...
self.act = inserter
def post(self):
values = self._extract_json_values()
self.act.insert(*values)
import logging
from Queue import Queue #queue in python 3
from threading import Event #hmm... this needed?
insert_logger = logging.getLogger('async_insert')
insert_logger.setLevel(logging.INFO)
def handle_err(err):
insert_logger.warning('Failed to insert due to %s', err)
#Designed to work in a high write environment. Chained callbacks for best performance and fast fail/stop when error
#encountered. Next insert should re-up the writing. Potential loss of failed write. Some guarantee on order of write
#preservation.
class CappedQueueInserter(object):
def __init__(self, session, max_count=0):
self.__queue = Queue(max_count)
self.__session = session
self.__started = Event()
@property
def started(self):
return self.__started.is_set()
def insert(self, bound_statement):
if not self.started:
self._begin(bound_statement)
else:
self._enqueue(bound_statement)
def _begin(self, bound_statement):
def callback():
try:
bound = self.__queue.get(True) #block until an item is added to the queue
future = self.__session.execute_async(bound)
future.add_callbacks(callback, handle_err)
except:
self.__started.clear()
self.__started.set()
future = self.__session.execute_async(bound_statement)
future.add_callbacks(callback, handle_err)
def _enqueue(self, bound_statement):
self.__queue.put(bound_statement, True)
#Separate insert statement binding from the insertion loop
class InsertEnqueue(object):
def __init__(self, prepared_query, insert, consistency_level=None):
self.__statement = prepared_query
self.__level = consistency_level
self.__sink = insert
def insert(self, *args):
bound = self.bind(*args)
self.__sink.insert(bound)
@property
def consistency_level(self):
return self.__level or self.__statement.consistency_level
@consistency_level.setter
def adjust_level(self, value):
if value:
self.__level = value
def bind(self, *args):
bound = self.__statement.bind(*args)
bound.consistency_level = self.consistency_level
return bound
import logging
from Queue import Queue #queue in python 3
from threading import Event #hmm... this needed?
insert_logger = logging.getLogger('async_insert')
insert_logger.setLevel(logging.INFO)
def handle_err(err):
insert_logger.warning('Failed to insert due to %s', err)
#Designed to work in a high write environment. Chained callbacks for best performance and fast fail/stop when error
#encountered. Next insert should re-up the writing. Potential loss of failed write. Some guarantee on order or write
#preservation.
class CappedQueueInserter(object):
def __init__(self, query, session, consistency_level=None, max_count=0):
self.__queue = Queue(max_count)
self.__statement = session.prepare(query)
self.__consistency_level = consistency_level
self.__session = session
self.__started = Event()
@property
def started(self):
return self.__started.is_set()
def insert(self, *args):
if not self.started:
self._begin(*args)
else:
self._enqueue(*args)
def _begin(self, *args):
def callback()::
try:
bound = self.__queue.get(True) #block until an item is added to the queue
future = self.__session.execute_async(bound)
future.add_callbacks(callback, handle_err)
except:
self.__started.clear()
self.__started.set()
bound = self._bind(*args)
future = self.__session.execute_async(bound)
future.add_callbacks(callback, handle_err)
def _enqueue(self, *args):
bound = self._bind(*args)
self.__queue.put(bound, True) #blocks thread until can be added to queue
def _bind(self, *args):
bound = self.__statement.bind(*args)
if self.__consistency_level:
bound.consistency_level = self.__consistency_level
return bound
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment