Created
March 28, 2019 14:25
-
-
Save gwbischof/8bb0b2ca40a57c71a4daf08d756337ff to your computer and use it in GitHub Desktop.
DocBuffer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DocBuffer(): | |
""" | |
DocBuffer is a thread-safe "embedding" buffer for bluesky event or datum | |
documents. | |
"embedding" refers to combining multiple documents from a stream of | |
documents into a single document, where the values of matching keys are | |
stored as a list, or dictionary of lists. "embedding" converts event docs | |
to event_pages, or datum doc to datum_pages. event_pages and datum_pages | |
are defined by the bluesky event-model. | |
Events with different descriptors, or datum with different resources are | |
stored in separate embedded documents in the buffer. The buffer uses a | |
defaultdict so new embedded documents are automatically created when they | |
are needed. The dump method totally clears the buffer. This mechanism | |
automatically manages the lifetime of the embeded documents in the buffer. | |
The doc_type argument which can be either 'event' or 'datum'. | |
The the details of the embedding differ for event and datum documents. | |
Internally the buffer is a dictionary that maps event decriptors to | |
event_pages or datum resources to datum_pages. | |
Parameters | |
---------- | |
doc_type: str | |
{'event', 'datum'} | |
buffer_size: int, optional | |
Maximum buffer size in bytes. | |
Attributes | |
---------- | |
current_size: int | |
Current size of the buffer. | |
""" | |
def __init__(self, doc_type, buffer_size): | |
self.worker_error = None | |
self._frozen = False | |
self._mutex = threading.Lock() | |
self._not_full = threading.Condition(self._mutex) | |
self._not_empty = threading.Condition(self._mutex) | |
self._doc_buffer = defaultdict(lambda: defaultdict(lambda: | |
defaultdict(list))) | |
self.current_size = 0 | |
if (buffer_size >= 400) and (buffer_size <= 15000000): | |
self._buffer_size = buffer_size | |
else: | |
raise AttributeError(f"Invalid buffer_size {buffer_size}, " | |
"buffer_size must be between 10000 and " | |
"15000000 inclusive.") | |
# Event docs and datum docs are embedded differently, this configures | |
# the buffer for the specified document type. | |
if doc_type == "event": | |
self._array_keys = set(["seq_num", "time", "uid"]) | |
self._dataframe_keys = set(["data", "timestamps", "filled"]) | |
self._stream_id_key = "descriptor" | |
elif doc_type == "datum": | |
self._array_keys = set(["datum_id"]) | |
self._dataframe_keys = set(["datum_kwargs"]) | |
self._stream_id_key = "resource" | |
else: | |
raise AttributeError(f"Invalid doc_type {doc_type}, doc_type must " | |
"be either 'event' or 'datum'") | |
def insert(self, doc): | |
""" | |
Inserts a bluesky event or datum document into buffer. This method | |
blocks if the buffer is full. | |
Parameters | |
---------- | |
doc: json | |
A validated bluesky event or datum document. | |
""" | |
if self.worker_error: | |
raise RuntimeError("Worker exception: " + str(self.worker_error)) | |
if self._frozen: | |
raise RuntimeError("Cannot insert documents into a " | |
"frozen DocBuffer") | |
doc_size = sys.getsizeof(doc) | |
if doc_size >= self._buffer_size: | |
raise RuntimeError("Failed to insert. Document size is greater " | |
"than max buffer size") | |
# Block if buffer is full. | |
with self._not_full: | |
self._not_full.wait_for(lambda: (self.current_size + doc_size) | |
< self._buffer_size) | |
self._buffer_insert(doc) | |
self.current_size += doc_size | |
# Wakes up threads that are waiting to dump. | |
self._not_empty.notify_all() | |
def dump(self): | |
""" | |
Get everything in the buffer and clear the buffer. This method blocks | |
if the buffer is empty. | |
Returns | |
------- | |
doc_buffer_dump: dict | |
A dictionary that maps event descriptor to event_page, or a | |
dictionary that maps datum resource to datum_page. | |
""" | |
# Block if the buffer is empty. | |
# Don't block if the buffer is frozen, this allows all workers threads | |
# finish when freezing the run. | |
with self._not_empty: | |
self._not_empty.wait_for(lambda: ( | |
self.current_size or self._frozen)) | |
# Get a reference to the current buffer, create a new buffer. | |
doc_buffer_dump = self._doc_buffer | |
self._doc_buffer = defaultdict(lambda: | |
defaultdict(lambda: | |
defaultdict(list))) | |
self.current_size = 0 | |
# Wakes up all threads that are waiting to insert. | |
self._not_full.notify_all() | |
return doc_buffer_dump | |
def _buffer_insert(self, doc): | |
# Embed the doc in the buffer | |
for key, value in doc.items(): | |
if key in self._array_keys: | |
self._doc_buffer[doc[self._stream_id_key]][key] = list( | |
self._doc_buffer[doc[self._stream_id_key]][key]) | |
self._doc_buffer[doc[self._stream_id_key]][key].append(value) | |
elif key in self._dataframe_keys: | |
for inner_key, inner_value in doc[key].items(): | |
(self._doc_buffer[doc[self._stream_id_key]][key] | |
[inner_key].append(inner_value)) | |
else: | |
self._doc_buffer[doc[self._stream_id_key]][key] = value | |
def freeze(self): | |
""" | |
Freeze the buffer preventing new inserts and stop dump from blocking | |
""" | |
self._frozen = True | |
self._mutex.acquire() | |
self._not_empty.notify_all() | |
self._mutex.release() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment