Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DocumentBuffer(): | |
# Insert should block if buffer is full | |
from collections import defaultdict | |
from threading import Lock | |
def __init__(self): | |
self.empty = True | |
self.AppendBuffer = defaultdict(list) | |
self.EventBuffer = defaultdict(lambda : defaultdict( lambda : defaultdict(dict))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pymongo import InsertOne, DeleteOne, ReplaceOne | |
from pymongo.errors import BulkWriteError | |
from pymongo import MongoClient | |
client = MongoClient('localhost', 27017) | |
update_test = client['UpdateOne'] | |
table = update_test.table | |
# drops the table to start fresh | |
def drop_testdb(): | |
client.drop_database(update_test) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
werqser |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DocBuffer(): | |
""" | |
DocBuffer is a thread-safe "embedding" buffer for bluesky event or datum | |
documents. | |
"embedding" refers to combining multiple documents from a stream of | |
documents into a single document, where the values of matching keys are | |
stored as a list, or dictionary of lists. "embedding" converts event docs | |
to event_pages, or datum doc to datum_pages. event_pages and datum_pages |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import inputs | |
import time | |
import threading | |
class GamePad: | |
def __init__(self, ldms_button='BTN_TL',rdms_button='BTN_TR', | |
activate_button='BTN_EAST', status_pv=None): | |
self._ldms_button = ldms_button |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import inputs | |
import time | |
import threading | |
from epics import caget, caput, cainfo | |
class GamePad: | |
def __init__(self, ldms_button='BTN_TL',rdms_button='BTN_TR', | |
activate_button='BTN_START', status_pv=None): | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
def page_chunks(page, chunk_size, remainder): | |
array_keys = ['seq_num', 'time', 'uid'] | |
page_size = len(page['uid']) | |
chunks = [(0,remainder)] | |
chunks.extend([(i, i + chunk_size) for i | |
in range(remainder, page_size, chunk_size)]) | |
for start, stop in chunks: | |
yield {'descriptor': page['descriptor'], |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import xarray | |
import string | |
import random | |
from functools import partial | |
def xevents(numcol, numrows): | |
stream_key = 'descriptor' | |
coord_key = 'time' | |
array_keys = ['seq_num', 'time', 'uid'] | |
dataframe_keys = ['data', 'timestamps', 'filled'] |
OlderNewer