Skip to content

Instantly share code, notes, and snippets.

View gwbischof's full-sized avatar
🧐
Learning

Garrett Bischof gwbischof

🧐
Learning
View GitHub Profile
@gwbischof
gwbischof / MongoBox.ipynb
Created November 9, 2018 22:05
Really easy way to get a mongodb running, with demonstration of a few PyMongo read/write methods.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
class DocumentBuffer():
# Insert should block if buffer is full
from collections import defaultdict
from threading import Lock
def __init__(self):
self.empty = True
self.AppendBuffer = defaultdict(list)
self.EventBuffer = defaultdict(lambda : defaultdict( lambda : defaultdict(dict)))
from pymongo import InsertOne, DeleteOne, ReplaceOne
from pymongo.errors import BulkWriteError
from pymongo import MongoClient
client = MongoClient('localhost', 27017)
update_test = client['UpdateOne']
table = update_test.table
# drops the table to start fresh
def drop_testdb():
client.drop_database(update_test)
@gwbischof
gwbischof / DocBuffer.py
Created March 28, 2019 14:25
DocBuffer
class DocBuffer():
"""
DocBuffer is a thread-safe "embedding" buffer for bluesky event or datum
documents.
"embedding" refers to combining multiple documents from a stream of
documents into a single document, where the values of matching keys are
stored as a list, or dictionary of lists. "embedding" converts event docs
to event_pages, or datum doc to datum_pages. event_pages and datum_pages
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
import inputs
import time
import threading
class GamePad:
def __init__(self, ldms_button='BTN_TL',rdms_button='BTN_TR',
activate_button='BTN_EAST', status_pv=None):
self._ldms_button = ldms_button
@gwbischof
gwbischof / pdf_gamepad.py
Created June 21, 2019 21:31
pdf gamepad
import inputs
import time
import threading
from epics import caget, caput, cainfo
class GamePad:
def __init__(self, ldms_button='BTN_TL',rdms_button='BTN_TR',
activate_button='BTN_START', status_pv=None):
import itertools
def page_chunks(page, chunk_size, remainder):
array_keys = ['seq_num', 'time', 'uid']
page_size = len(page['uid'])
chunks = [(0,remainder)]
chunks.extend([(i, i + chunk_size) for i
in range(remainder, page_size, chunk_size)])
for start, stop in chunks:
yield {'descriptor': page['descriptor'],
@gwbischof
gwbischof / merge.py
Last active June 27, 2019 18:01
merge xarrays
import xarray
import string
import random
from functools import partial
def xevents(numcol, numrows):
stream_key = 'descriptor'
coord_key = 'time'
array_keys = ['seq_num', 'time', 'uid']
dataframe_keys = ['data', 'timestamps', 'filled']