Skip to content

Instantly share code, notes, and snippets.

@gwbischof
Created March 28, 2019 14:25
Show Gist options
  • Save gwbischof/8bb0b2ca40a57c71a4daf08d756337ff to your computer and use it in GitHub Desktop.
Save gwbischof/8bb0b2ca40a57c71a4daf08d756337ff to your computer and use it in GitHub Desktop.
DocBuffer
class DocBuffer():
"""
DocBuffer is a thread-safe "embedding" buffer for bluesky event or datum
documents.
"embedding" refers to combining multiple documents from a stream of
documents into a single document, where the values of matching keys are
stored as a list, or dictionary of lists. "embedding" converts event docs
to event_pages, or datum doc to datum_pages. event_pages and datum_pages
are defined by the bluesky event-model.
Events with different descriptors, or datum with different resources are
stored in separate embedded documents in the buffer. The buffer uses a
defaultdict so new embedded documents are automatically created when they
are needed. The dump method totally clears the buffer. This mechanism
automatically manages the lifetime of the embeded documents in the buffer.
The doc_type argument which can be either 'event' or 'datum'.
The the details of the embedding differ for event and datum documents.
Internally the buffer is a dictionary that maps event decriptors to
event_pages or datum resources to datum_pages.
Parameters
----------
doc_type: str
{'event', 'datum'}
buffer_size: int, optional
Maximum buffer size in bytes.
Attributes
----------
current_size: int
Current size of the buffer.
"""
def __init__(self, doc_type, buffer_size):
self.worker_error = None
self._frozen = False
self._mutex = threading.Lock()
self._not_full = threading.Condition(self._mutex)
self._not_empty = threading.Condition(self._mutex)
self._doc_buffer = defaultdict(lambda: defaultdict(lambda:
defaultdict(list)))
self.current_size = 0
if (buffer_size >= 400) and (buffer_size <= 15000000):
self._buffer_size = buffer_size
else:
raise AttributeError(f"Invalid buffer_size {buffer_size}, "
"buffer_size must be between 10000 and "
"15000000 inclusive.")
# Event docs and datum docs are embedded differently, this configures
# the buffer for the specified document type.
if doc_type == "event":
self._array_keys = set(["seq_num", "time", "uid"])
self._dataframe_keys = set(["data", "timestamps", "filled"])
self._stream_id_key = "descriptor"
elif doc_type == "datum":
self._array_keys = set(["datum_id"])
self._dataframe_keys = set(["datum_kwargs"])
self._stream_id_key = "resource"
else:
raise AttributeError(f"Invalid doc_type {doc_type}, doc_type must "
"be either 'event' or 'datum'")
def insert(self, doc):
"""
Inserts a bluesky event or datum document into buffer. This method
blocks if the buffer is full.
Parameters
----------
doc: json
A validated bluesky event or datum document.
"""
if self.worker_error:
raise RuntimeError("Worker exception: " + str(self.worker_error))
if self._frozen:
raise RuntimeError("Cannot insert documents into a "
"frozen DocBuffer")
doc_size = sys.getsizeof(doc)
if doc_size >= self._buffer_size:
raise RuntimeError("Failed to insert. Document size is greater "
"than max buffer size")
# Block if buffer is full.
with self._not_full:
self._not_full.wait_for(lambda: (self.current_size + doc_size)
< self._buffer_size)
self._buffer_insert(doc)
self.current_size += doc_size
# Wakes up threads that are waiting to dump.
self._not_empty.notify_all()
def dump(self):
"""
Get everything in the buffer and clear the buffer. This method blocks
if the buffer is empty.
Returns
-------
doc_buffer_dump: dict
A dictionary that maps event descriptor to event_page, or a
dictionary that maps datum resource to datum_page.
"""
# Block if the buffer is empty.
# Don't block if the buffer is frozen, this allows all workers threads
# finish when freezing the run.
with self._not_empty:
self._not_empty.wait_for(lambda: (
self.current_size or self._frozen))
# Get a reference to the current buffer, create a new buffer.
doc_buffer_dump = self._doc_buffer
self._doc_buffer = defaultdict(lambda:
defaultdict(lambda:
defaultdict(list)))
self.current_size = 0
# Wakes up all threads that are waiting to insert.
self._not_full.notify_all()
return doc_buffer_dump
def _buffer_insert(self, doc):
# Embed the doc in the buffer
for key, value in doc.items():
if key in self._array_keys:
self._doc_buffer[doc[self._stream_id_key]][key] = list(
self._doc_buffer[doc[self._stream_id_key]][key])
self._doc_buffer[doc[self._stream_id_key]][key].append(value)
elif key in self._dataframe_keys:
for inner_key, inner_value in doc[key].items():
(self._doc_buffer[doc[self._stream_id_key]][key]
[inner_key].append(inner_value))
else:
self._doc_buffer[doc[self._stream_id_key]][key] = value
def freeze(self):
"""
Freeze the buffer preventing new inserts and stop dump from blocking
"""
self._frozen = True
self._mutex.acquire()
self._not_empty.notify_all()
self._mutex.release()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment