Forked from danielballan/Simple Broker Demo.ipynb
Last active
August 29, 2015 14:14
-
-
Save tacaswell/2f3115aad98c3411d8e6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"metadata": { | |
"name": "", | |
"signature": "sha256:4842fc356012005189072bdf38fe4e737d15e5d16ff268d95dd4136830c3ec18" | |
}, | |
"nbformat": 3, | |
"nbformat_minor": 0, | |
"worksheets": [ | |
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Simple Data Broker Demo\n", | |
"\n", | |
"Here, we demonstrate that a simple implementation of the data broker can fetch and combine data from all three sources. It has only one interface, a `search` function.\n", | |
"\n", | |
"Note the example in the docstring below, illustrating the format of the returned data." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"from databroker import broker" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"stream": "stderr", | |
"text": [ | |
"/home/tcaswell/.virtualenvs/dd_py2k/lib/python2.7/site-packages/pytz/__init__.py:29: UserWarning: Module bson was already imported from /home/tcaswell/.virtualenvs/dd_py2k/lib/python2.7/site-packages/bson/__init__.pyc, but /home/tcaswell/.pe/dd_py2k/lib/python2.7/site-packages/mongoengine-0.8.7-py2.7.egg is being added to sys.path\n", | |
" from pkg_resources import resource_stream\n" | |
] | |
} | |
], | |
"prompt_number": 1 | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"help(broker.search)" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"stream": "stdout", | |
"text": [ | |
"Help on function search in module databroker.broker.simple_broker:\n", | |
"\n", | |
"search(beamline_id, start_time, end_time, ca_host, channels=None)\n", | |
" Get data from all events from a given beamline between two times.\n", | |
" \n", | |
" Parameters\n", | |
" ----------\n", | |
" beamline_id : string\n", | |
" e.g., 'srx'\n", | |
" start_time : string or datetime object\n", | |
" e.g., datetime.datetime(2015, 1, 1) or '2015-01-01' (ISO format)\n", | |
" end_time : string or datetime object\n", | |
" e.g., datetime.datetime(2015, 1, 1) or '2015-01-01' (ISO format)\n", | |
" ca_host : URL string\n", | |
" the URL of your archiver's ArchiveDataServer.cgi. For example,\n", | |
" 'http://cr01arc01/cgi-bin/ArchiveDataServer.cgi'\n", | |
" channels : list, optional\n", | |
" All queries will return applicable data from the N most popular\n", | |
" channels. If data from additional channels is needed, their full\n", | |
" identifiers (not human-readable names) must be given here as a list\n", | |
" of strings.\n", | |
" \n", | |
" Returns\n", | |
" -------\n", | |
" data : list\n", | |
" See example below illustrating the format of the returned dataset.\n", | |
" \n", | |
" Example\n", | |
" -------\n", | |
" >>> search('srx', '2015-01-01', '2015-01-02')\n", | |
" [(<unix epoch time>, {'chan1': <value>, 'chan2': <value>},\n", | |
" (<unix epoch time>, {'temp': <value>)}\n", | |
" \n", | |
" That is, it results a list of tuples, where each tuple contains a time and\n", | |
" a dictionary of name/value pairs. Every value is guaranteed to be either a\n", | |
" scalar Python primitive (int, float, string) or a numpy ndarray.\n", | |
"\n" | |
] | |
} | |
], | |
"prompt_number": 2 | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Sidebar: Dummy Sources\n", | |
"\n", | |
"The databroker repo also contains a `sources` module that imitates the APIs of the three sources. It will be useful for testing and development, and it serves as a stopgap until the new MDS format is fully specified and implemented.\n", | |
"\n", | |
"You can switch between using \"live\" and \"dummy\" versions of any source at runtime." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"from databroker import sources\n", | |
"from datetime import datetime as dt\n", | |
"from datetime import timedelta as tdelta\n", | |
"import uuid\n", | |
"\n", | |
"import databroker.broker.simple_broker as sb\n", | |
"import databroker.sources\n", | |
"from databroker.tests.test_broker import generate_ca_data\n", | |
"mds_capi = databroker.sources.metadataStore.api.collection\n", | |
"mds_aapi = databroker.sources.metadataStore.api.analysis\n", | |
"fs_coms = databroker.sources.fileStore.commands\n", | |
"\n", | |
"from fileStore.test.t_utils import SynHandlerMod, SynHandlerEcho\n", | |
"from fileStore.retrieve import register_handler\n", | |
"\n", | |
"# Switch to dummy versions of all sources. The API is unchanged.\n", | |
"sources.switch(metadatastore=False, filestore=True, channelarchiver=False)" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [], | |
"prompt_number": 3 | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"import databroker.sources.dummy_sources._metadataStore.api._dummies as mds_dum\n", | |
"mds_dum._events.clear()\n", | |
"mds_dum._ev_desc.clear()" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [], | |
"prompt_number": 4 | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"register_handler('syn-mod', SynHandlerMod)\n", | |
"register_handler('syn-echo', SynHandlerEcho)\n", | |
"\n", | |
"N = 2\n", | |
"M = 4\n", | |
"shape = (3, 4)\n", | |
"\n", | |
"f_types = ['syn-mod', 'syn-echo']" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [], | |
"prompt_number": 5 | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Currently, the dummy Metadata Store and File Store just return boilerplate data for any query. To put simulated data into the dummy Channel Archiver, we sneak it in where we would normally specify the host URL." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"start = dt.now()\n", | |
"\n", | |
"f_types = ['syn-mod', 'syn-echo']\n", | |
"s_name = ['mod_base', 'echo_base']\n", | |
"for n in range(N):\n", | |
" # outer look over events\n", | |
" # make the event descriptor\n", | |
" ev_desc = mds_capi.save_event_descriptor(\n", | |
" data_keys={s_name[n%len(s_name)]: {'source': 'PV:foo',},\n", | |
" 'img': {'source': 'CCD:syn',\n", | |
" 'external': 'FILESTORE'}})\n", | |
" # make file descriptor entry\n", | |
" fb = fs_coms.save_file_base(f_types[n%len(f_types)], 'filepath',\n", | |
" {'shape': shape})\n", | |
" for m in range(M):\n", | |
" a_val = n * M + m + 1\n", | |
" r_id = str(uuid.uuid4())\n", | |
" fs_coms.save_file_event_link(fb, r_id, {'n': a_val})\n", | |
" data_dict = {s_name[n%len(s_name)]: {'value': a_val, 'timestamp': dt.now()},\n", | |
" 'img': {'value': r_id, 'timestamp': dt.now()}}\n", | |
" event = mds_capi.save_event(event_descriptor=ev_desc, data=data_dict)\n", | |
" \n", | |
"end = dt.now()\n", | |
"\n", | |
"\n", | |
"\n", | |
"\n", | |
"simulated_ca_data = generate_ca_data(['SR11BCM01:LIFETIME_MONITOR', 'SR11BCM01:CURRENT_MONITOR'], start, end + tdelta(minutes=5))" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [], | |
"prompt_number": 6 | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"print(start,end)" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"stream": "stdout", | |
"text": [ | |
"(datetime.datetime(2015, 1, 23, 11, 13, 55, 86567), datetime.datetime(2015, 1, 23, 11, 13, 55, 98369))\n" | |
] | |
} | |
], | |
"prompt_number": 7 | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## A Sample Query\n", | |
"\n", | |
"A basic query returns all the applicable data from the MDS and FS along with data from the most commonly used CA channels." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"broker.search('srx', start, end, ca_host=str(simulated_ca_data))" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"metadata": {}, | |
"output_type": "pyout", | |
"prompt_number": 8, | |
"text": [ | |
"[(datetime.datetime(2015, 1, 23, 11, 13, 55, 92971),\n", | |
" {'img': array([[0, 0, 0, 0],\n", | |
" [0, 0, 0, 0],\n", | |
" [0, 0, 0, 0]]), 'mod_base': 1}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 13, 55, 93781),\n", | |
" {'img': array([[0, 1, 0, 1],\n", | |
" [0, 1, 0, 1],\n", | |
" [0, 1, 0, 1]]), 'mod_base': 2}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 13, 55, 94418),\n", | |
" {'img': array([[0, 1, 2, 0],\n", | |
" [1, 2, 0, 1],\n", | |
" [2, 0, 1, 2]]), 'mod_base': 3}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 13, 55, 95075),\n", | |
" {'img': array([[0, 1, 2, 3],\n", | |
" [0, 1, 2, 3],\n", | |
" [0, 1, 2, 3]]), 'mod_base': 4}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 13, 55, 96428),\n", | |
" {'echo_base': 5, 'img': array([[ 5., 5., 5., 5.],\n", | |
" [ 5., 5., 5., 5.],\n", | |
" [ 5., 5., 5., 5.]])}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 13, 55, 97078),\n", | |
" {'echo_base': 6, 'img': array([[ 6., 6., 6., 6.],\n", | |
" [ 6., 6., 6., 6.],\n", | |
" [ 6., 6., 6., 6.]])}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 13, 55, 97690),\n", | |
" {'echo_base': 7, 'img': array([[ 7., 7., 7., 7.],\n", | |
" [ 7., 7., 7., 7.],\n", | |
" [ 7., 7., 7., 7.]])}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 13, 55, 98250),\n", | |
" {'echo_base': 8, 'img': array([[ 8., 8., 8., 8.],\n", | |
" [ 8., 8., 8., 8.],\n", | |
" [ 8., 8., 8., 8.]])}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 13, 55, 86567),\n", | |
" {'SR11BCM01:LIFETIME_MONITOR': 0}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 14, 55, 86567),\n", | |
" {'SR11BCM01:LIFETIME_MONITOR': 1}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 15, 55, 86567),\n", | |
" {'SR11BCM01:LIFETIME_MONITOR': 2}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 16, 55, 86567),\n", | |
" {'SR11BCM01:LIFETIME_MONITOR': 3}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 17, 55, 86567),\n", | |
" {'SR11BCM01:LIFETIME_MONITOR': 4}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 18, 55, 86567),\n", | |
" {'SR11BCM01:LIFETIME_MONITOR': 5}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 13, 55, 86567),\n", | |
" {'SR11BCM01:CURRENT_MONITOR': 0}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 14, 55, 86567),\n", | |
" {'SR11BCM01:CURRENT_MONITOR': 1}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 15, 55, 86567),\n", | |
" {'SR11BCM01:CURRENT_MONITOR': 2}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 16, 55, 86567),\n", | |
" {'SR11BCM01:CURRENT_MONITOR': 3}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 17, 55, 86567),\n", | |
" {'SR11BCM01:CURRENT_MONITOR': 4}),\n", | |
" (datetime.datetime(2015, 1, 23, 11, 18, 55, 86567),\n", | |
" {'SR11BCM01:CURRENT_MONITOR': 5})]" | |
] | |
} | |
], | |
"prompt_number": 8 | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Guts\n", | |
"\n", | |
"See the documents stored in the mds, note teh uuid in the value of 'img'" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [ | |
"raw_events = mds_aapi.find()\n", | |
"[(re.uid, re.data) for re in raw_events]" | |
], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"metadata": {}, | |
"output_type": "pyout", | |
"prompt_number": 9, | |
"text": [ | |
"[('b04a9e50-240e-491c-bc2a-6e662fac5ff1',\n", | |
" {'img': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 92870),\n", | |
" 'value': '932e89d2-7a54-4df0-b3a5-cd43e00fc942'},\n", | |
" 'mod_base': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 92850),\n", | |
" 'value': 1}}),\n", | |
" ('a205e86f-d361-4e83-ad47-3a5acb25cab3',\n", | |
" {'img': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 93726),\n", | |
" 'value': '63a1616f-06c7-48bc-aa1e-d68f6cc72806'},\n", | |
" 'mod_base': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 93717),\n", | |
" 'value': 2}}),\n", | |
" ('de747a97-ddbc-4c33-b24c-952b63b7d777',\n", | |
" {'img': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 94367),\n", | |
" 'value': 'd9ce74c7-4799-451e-9e64-4718ee15d168'},\n", | |
" 'mod_base': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 94359),\n", | |
" 'value': 3}}),\n", | |
" ('f849f3d4-f154-4285-b5a6-ff64684a47c9',\n", | |
" {'img': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 95025),\n", | |
" 'value': 'b14900b3-0b26-40fb-b78e-df51499157f5'},\n", | |
" 'mod_base': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 95018),\n", | |
" 'value': 4}}),\n", | |
" ('c5ff2535-0f5c-48e5-9231-930288a57ad2',\n", | |
" {'echo_base': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 96370),\n", | |
" 'value': 5},\n", | |
" 'img': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 96378),\n", | |
" 'value': '458050dc-684b-437f-b216-3ac0c8bd4a3f'}}),\n", | |
" ('5af93ea6-a575-4774-bc9d-1410a3ed0573',\n", | |
" {'echo_base': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 97021),\n", | |
" 'value': 6},\n", | |
" 'img': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 97029),\n", | |
" 'value': 'a6bbb750-9cba-4ec1-aa21-c55f9827282b'}}),\n", | |
" ('25ecdbc6-d026-4fe5-b7ef-eb2a72b96a3b',\n", | |
" {'echo_base': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 97634),\n", | |
" 'value': 7},\n", | |
" 'img': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 97642),\n", | |
" 'value': 'ef74be97-11f1-40dc-8162-36739605e5f1'}}),\n", | |
" ('d118717d-ad41-43df-9406-a514f3049bfc',\n", | |
" {'echo_base': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 98194),\n", | |
" 'value': 8},\n", | |
" 'img': {'timestamp': datetime.datetime(2015, 1, 23, 11, 13, 55, 98202),\n", | |
" 'value': '337f257a-27d6-4a95-bb81-7765124670f7'}})]" | |
] | |
} | |
], | |
"prompt_number": 9 | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## What about the muggler?\n", | |
"\n", | |
"All communication with the three data sources takes place through the broker. (Fuller implementations of the broker will manage various optimizations in data retrieval. All of that must be isolated from higher layers.) The muggler can make requests for additional data, such as adding a source from the Channel Archiver, but it will do so through the broker." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"collapsed": false, | |
"input": [], | |
"language": "python", | |
"metadata": {}, | |
"outputs": [], | |
"prompt_number": 8 | |
} | |
], | |
"metadata": {} | |
} | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment