Skip to content

Instantly share code, notes, and snippets.

@gwbischof
Last active December 12, 2018 22:28
Show Gist options
  • Save gwbischof/c15270caef2c8c1ac1855593201689a0 to your computer and use it in GitHub Desktop.
Save gwbischof/c15270caef2c8c1ac1855593201689a0 to your computer and use it in GitHub Desktop.
class DocumentBuffer():
# Insert should block if buffer is full
from collections import defaultdict
from threading import Lock
def __init__(self):
self.empty = True
self.AppendBuffer = defaultdict(list)
self.EventBuffer = defaultdict(lambda : defaultdict( lambda : defaultdict(dict)))
self.mutex = Lock()
self.dump_block = threading.Condition(self.mutex)
def insert(self, doc_type, doc):
self.mutex.acquire()
try:
if (doc_type == 'event'): __embed(doc_type, doc)
else: __append(doc_type, doc)
self.empty = False
finally:
self.mutex.release()
self.dump_block.notify()
def dump(self):
while self.empty:
self.dump_block.wait()
self.mutex.acquire()
try:
AppendBuffer_dump = self.AppendBuffer
EventBuffer_dump = self.EventBuffer
self.AppendBuffer = defaultdict(list)
self.EventBuffer = defaultdict(lambda : defaultdict( lambda : defaultdict(dict)))
self.empty = True
finally:
self.mutex.release()
return AppendBuffer_dump, EventBuffer_dump
def __append(self, doc_type, doc):
self.AppendBuffer[doc_type].append(doc)
def __embed(self, doc_type, doc):
self.EventBuffer[doc_type][doc['descriptor']]['time']['time'].append(doc['time'])
self.EventBuffer[doc_type][doc['descriptor']]['seq_num']['seq_num'].append(doc['seq_num'])
for key, value in doc['data'].items():
self.EventBuffer[doc_type][doc['descriptor']]['data'][key].append(value)
for key, value in doc['timestamps'].items():
self.EventBuffer[doc_type][doc['descriptor']]['timestamps'][key].append(value)
class EmbeddedBroker():
# Needs code to prevent the same run from inserting twice
# Needs code to split document into pages if document size exceeds the 16MB BSON limit
# Need to create a new run in the database if it doesn't exist yet.
from pymongo import InsertOne, UpdateOne
from pymongo.errors import BulkWriteError
from pymongo import MongoClient
from threading import Thread
client = MongoClient('localhost', 27017)
def __init__(self, num_threads=1):
self.documentBuffer = DocumentBuffer()
self.db = client['test_db']
self.table = db.runs
for i in range(num_threads):
t = Thread(target=__db_worker)
t.daemon = True
t.start()
def __db_worker(self):
while True:
appendBuffer, eventBuffer = documentBuffer.dump()
if appendBuffer != defaultdict(list):
__bulkwrite_append(appendBuffer)
if eventBuffer != defaultdict(lambda : defaultdict( lambda : defaultdict(dict))):
__bulkwrite_embed(eventBuffer)
def __bulkwrite_append(self, buffer):
operations = [UpdateOne({'run_id': self.run_id}, {'$push': {doc_type: value}, {upsert: true}})
for doc_type, value in buffer.items()]
self.table.bulk_write(operations)
# Currently specialized for event documents only
def __bulkwrite_embed(self, buffer):
operations = []
operations.append(UpdateOne({'run_id': self.run_id}, {'$push': {'events': {value['descriptor'] :
{'time' : {'time': value['descriptor']['time']['time']}}}}}, {upsert: true})
operations.append(UpdateOne({'run_id': self.run_id}, {'$push': {'events': {value['descriptor'] :
{'seq_num' : {'seq_num': value['descriptor']['seq_num']['seq_num']}}}}}, {upsert: true})
for key, value in buffer['events'].items():
self.EventBuffer['events'][doc['descriptor']]['timestamps'][key].append(value)
for key, value in buffer['events'].items():
self.EventBuffer['events'][doc['descriptor']]['timestamps'][key].append(value)
self.table.bulk_write(operations)
def __create_run(self, run_id):
if run_id != self.run_id:
result = table.find_one({'run_id' : self.run_id})
if result == None: table.insert_one({'run_id': self.run_id, 'page': 1})
self.run_id = run_id
def insert(self, run_id, doc_type, doc):
__create_run(run_id)
self.documentBuffer.insert(doc_type, doc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment