Last active
December 12, 2018 22:28
-
-
Save gwbischof/c15270caef2c8c1ac1855593201689a0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DocumentBuffer(): | |
# Insert should block if buffer is full | |
from collections import defaultdict | |
from threading import Lock | |
def __init__(self): | |
self.empty = True | |
self.AppendBuffer = defaultdict(list) | |
self.EventBuffer = defaultdict(lambda : defaultdict( lambda : defaultdict(dict))) | |
self.mutex = Lock() | |
self.dump_block = threading.Condition(self.mutex) | |
def insert(self, doc_type, doc): | |
self.mutex.acquire() | |
try: | |
if (doc_type == 'event'): __embed(doc_type, doc) | |
else: __append(doc_type, doc) | |
self.empty = False | |
finally: | |
self.mutex.release() | |
self.dump_block.notify() | |
def dump(self): | |
while self.empty: | |
self.dump_block.wait() | |
self.mutex.acquire() | |
try: | |
AppendBuffer_dump = self.AppendBuffer | |
EventBuffer_dump = self.EventBuffer | |
self.AppendBuffer = defaultdict(list) | |
self.EventBuffer = defaultdict(lambda : defaultdict( lambda : defaultdict(dict))) | |
self.empty = True | |
finally: | |
self.mutex.release() | |
return AppendBuffer_dump, EventBuffer_dump | |
def __append(self, doc_type, doc): | |
self.AppendBuffer[doc_type].append(doc) | |
def __embed(self, doc_type, doc): | |
self.EventBuffer[doc_type][doc['descriptor']]['time']['time'].append(doc['time']) | |
self.EventBuffer[doc_type][doc['descriptor']]['seq_num']['seq_num'].append(doc['seq_num']) | |
for key, value in doc['data'].items(): | |
self.EventBuffer[doc_type][doc['descriptor']]['data'][key].append(value) | |
for key, value in doc['timestamps'].items(): | |
self.EventBuffer[doc_type][doc['descriptor']]['timestamps'][key].append(value) | |
class EmbeddedBroker(): | |
# Needs code to prevent the same run from inserting twice | |
# Needs code to split document into pages if document size exceeds the 16MB BSON limit | |
# Need to create a new run in the database if it doesn't exist yet. | |
from pymongo import InsertOne, UpdateOne | |
from pymongo.errors import BulkWriteError | |
from pymongo import MongoClient | |
from threading import Thread | |
client = MongoClient('localhost', 27017) | |
def __init__(self, num_threads=1): | |
self.documentBuffer = DocumentBuffer() | |
self.db = client['test_db'] | |
self.table = db.runs | |
for i in range(num_threads): | |
t = Thread(target=__db_worker) | |
t.daemon = True | |
t.start() | |
def __db_worker(self): | |
while True: | |
appendBuffer, eventBuffer = documentBuffer.dump() | |
if appendBuffer != defaultdict(list): | |
__bulkwrite_append(appendBuffer) | |
if eventBuffer != defaultdict(lambda : defaultdict( lambda : defaultdict(dict))): | |
__bulkwrite_embed(eventBuffer) | |
def __bulkwrite_append(self, buffer): | |
operations = [UpdateOne({'run_id': self.run_id}, {'$push': {doc_type: value}, {upsert: true}}) | |
for doc_type, value in buffer.items()] | |
self.table.bulk_write(operations) | |
# Currently specialized for event documents only | |
def __bulkwrite_embed(self, buffer): | |
operations = [] | |
operations.append(UpdateOne({'run_id': self.run_id}, {'$push': {'events': {value['descriptor'] : | |
{'time' : {'time': value['descriptor']['time']['time']}}}}}, {upsert: true}) | |
operations.append(UpdateOne({'run_id': self.run_id}, {'$push': {'events': {value['descriptor'] : | |
{'seq_num' : {'seq_num': value['descriptor']['seq_num']['seq_num']}}}}}, {upsert: true}) | |
for key, value in buffer['events'].items(): | |
self.EventBuffer['events'][doc['descriptor']]['timestamps'][key].append(value) | |
for key, value in buffer['events'].items(): | |
self.EventBuffer['events'][doc['descriptor']]['timestamps'][key].append(value) | |
self.table.bulk_write(operations) | |
def __create_run(self, run_id): | |
if run_id != self.run_id: | |
result = table.find_one({'run_id' : self.run_id}) | |
if result == None: table.insert_one({'run_id': self.run_id, 'page': 1}) | |
self.run_id = run_id | |
def insert(self, run_id, doc_type, doc): | |
__create_run(run_id) | |
self.documentBuffer.insert(doc_type, doc) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment