Skip to content

Instantly share code, notes, and snippets.

@gwbischof
Last active June 19, 2019 18:35
Show Gist options
  • Save gwbischof/d7858ffb5bb78dc3e399711ea16b3974 to your computer and use it in GitHub Desktop.
Save gwbischof/d7858ffb5bb78dc3e399711ea16b3974 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install graphviz\n",
"!pip install dask distributed --upgrade\n",
"!pip install bokeh"
]
},
{
"cell_type": "code",
"execution_count": 167,
"metadata": {},
"outputs": [],
"source": [
"import xarray\n",
"import random\n",
"import string\n",
"import os\n",
"from functools import partial\n",
" \n",
"def resource():\n",
" resource = {'path_semantics': 'posix',\n",
" 'resource_kwargs': {},\n",
" 'resource_path': '~/data_file',\n",
" 'root': '~/',\n",
" 'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd',\n",
" 'spec': 'SOME_SPEC',\n",
" 'uid': '272132cf-564f-428f-bf6b-149ee4287024'}\n",
" if not os.path.isfile(resource['resource_path']):\n",
" with open(os.path.expanduser(resource['resource_path']), \"w+\") as file:\n",
" file.write(str(range(10000)))\n",
" \n",
" return resource\n",
"\n",
"def xevents(numcol, numrows):\n",
" stream_key = 'descriptor' \n",
" coord_key = 'time' \n",
" array_keys = ['seq_num', 'time', 'uid'] \n",
" dataframe_keys = ['data', 'timestamps', 'filled'] \n",
" \n",
" coords = range(numrows)\n",
" xarr = partial(to_xarray, coord_label=coord_key, coords=coords)\n",
" datakeys = list(string.ascii_lowercase)[0:numcol]\n",
" \n",
" xpage = {**{stream_key: random.randint(0,100)}, \n",
" **{key: xarr(range(numrows), name=key) for key in array_keys}, \n",
" **{'data': xarray.merge({key: xarr(randlist(numrows), name=key) \n",
" for key in datakeys}.values())}, \n",
" **{'timestamps': xarray.merge({key: xarr(randlist(numrows), name=key) \n",
" for key in datakeys}.values())}, \n",
" **{'filled': xarray.merge({key: xarr(randlist(numrows), name=key) \n",
" for key in datakeys}.values())}} \n",
" \n",
" return xpage\n",
"\n",
"def xdatum(numcol, numrows): \n",
" stream_key = 'resource' \n",
" coord_key = 'datum_id' \n",
" array_keys = ['datum_id'] \n",
" dataframe_keys = ['datum_kwargs'] \n",
" \n",
" coords = range(numrows)\n",
" xarr = partial(to_xarray, coord_label=coord_key, coords=coords)\n",
" datakeys = list(string.ascii_lowercase)[0:numcol]\n",
" \n",
" xpage = {**{stream_key: random.randint(0,100)}, \n",
" **{key: xarr(range(numrows), name=key) for key in array_keys}, \n",
" **{'datum_kwargs': xarray.merge({key: xarr(randlist(numrows), name=key) \n",
" for key in datakeys}.values())}}\n",
" \n",
" return xpage\n",
"\n",
"\n",
"def randlist(length):\n",
" return [random.randint(0,100) for _ in range(length)] \n",
"\n",
"\n",
"def to_xarray(array, coord_label, coords, name):\n",
" array = (xarray.DataArray(array, \n",
" dims=(coord_label,), \n",
" coords={coord_label: coords}, \n",
" name=name))\n",
" return array"
]
},
{
"cell_type": "code",
"execution_count": 168,
"metadata": {},
"outputs": [],
"source": [
"from event_model import DocumentRouter\n",
"import numpy as np\n",
"from dask import array\n",
"from dask import delayed\n",
"import xarray\n",
"import time\n",
"\n",
"class DaskFiller():\n",
"\n",
" def __init__(self, handler_registry=None, *, \n",
" include=None, exclude=None, root_map=None,\n",
" handler_cache={}, resource_cache={}, datum_cache={}, chunk_size = 100,\n",
" retry_intervals=(0.001, 0.002, 0.004, 0.008, 0.016, 0.032,\n",
" 0.064, 0.128, 0.256, 0.512, 1.024)):\n",
" if include is not None and exclude is not None:\n",
" raise EventModelValueError(\n",
" \"The parameters `include` and `exclude` are mutually \"\n",
" \"incompatible. At least one must be left as the default, \"\n",
" \"None.\")\n",
" self.handler_registry = handler_registry\n",
" self.include = include\n",
" self.exclude = exclude\n",
" self.root_map = root_map or {}\n",
" self._handler_cache = handler_cache or {}\n",
" self._resource_cache = resource_cache or {}\n",
" self._datum_cache = datum_cache or {}\n",
" self._chunk_size = chunk_size\n",
" self.retry_intervals = list(retry_intervals)\n",
" self._closed = False\n",
"\n",
" def __repr__(self):\n",
" return \"<Filler>\" if not self._closed else \"<Closed Filler>\"\n",
" \n",
" def fill(self, bes):\n",
" # bes is short for BlueskyEventStream\n",
" descriptor_docs = [doc for doc in bes._get_event_descriptors() \n",
" if doc.get('name') == bes._stream_name]\n",
" \n",
" data_keys = descriptor_docs[0]['data_keys'] \n",
" print(data_keys)\n",
" more_keys = ['seq_num', 'uid'] \n",
" needs_filling = {key for key, value in data_keys.items() if value.get('external', False)} \n",
" \n",
" filled_pages = []\n",
" for descriptor in descriptor_docs:\n",
" filled_pages.extend([self._fill_eventpage(event_page, needs_filling) \n",
" for event_page in bes._get_eventpages([descriptor['uid']])])\n",
" \n",
" return filled_pages\n",
"\n",
" def _fill_eventpage(self, event_page, needs_filling): \n",
" stream_key = 'descriptor' \n",
" coord_key = 'time' \n",
" array_keys = ['seq_num', 'time', 'uid'] \n",
" dataframe_keys = ['data', 'timestamps', 'filled'] \n",
" \n",
" coords = event_page[coord_key] \n",
" xarr = partial(self._to_xarray, coord_label=coord_key, coords=coords) \n",
" filled_page = {**{stream_key: event_page[stream_key]}, \n",
" **{key: xarr(event_page[key], name=key) for key in array_keys}, \n",
" **{'data': xarray.merge({key: xarr(event_page['data'][key], name=key, fill=(key in needs_filling)) \n",
" for key in event_page['data'].keys()}.values())}, \n",
" **{'timestamps': xarray.merge({key: xarr(event_page['timestamps'][key], name=key)\n",
" for key in event_page['data'].keys()}.values())}, \n",
" **{'filled': xarray.merge({key: xarr(event_page['filled'][key], name=key) \n",
" for key in event_page['data'].keys()}.values())}} \n",
" \n",
" return filled_page\n",
" \n",
" def _xarray_datumpages(self, datum_page): \n",
" stream_key = 'resource' \n",
" coord_key = 'datum_id' \n",
" array_keys = ['datum_id'] \n",
" dataframe_keys = ['datum_kwargs'] \n",
" \n",
" coords = datum_page[coord_key] \n",
" xarr = partial(self._to_xarray, coord_label=coord_key, coords=coords) \n",
" xarray_datum = {**{stream_key: datum_page[stream_key]}, \n",
" **{key: xarr(datum_page[key], name=key) for key in array_keys}, \n",
" **{'datum_kwargs': xarray.merge({key: xarr(datum_page['datum_kwargs'][key], name=key)\n",
" for key in datum_page['datum_kwargs'].keys()}.values())}} \n",
" return xarray_datum\n",
"\n",
" def _to_xarray(self, collumn, coord_label, coords, name, fill=False):\n",
" if fill:\n",
" return xarray.DataArray(array.concatenate(\n",
" [array.from_delayed(self._fill_chunk(chunk), (len(chunk),), dtype=object)\n",
" for chunk in self._chunks(collumn, self._chunk_size)]))\n",
" else:\n",
" return (xarray.DataArray(collumn, \n",
" dims=(coord_label,), \n",
" coords={coord_label: coords}, \n",
" name=name))\n",
"\n",
" def _chunks(self, col, chunk_size):\n",
" for i in range(0, len(col), chunk_size):\n",
" yield col[i:i + chunk_size]\n",
"\n",
" @delayed\n",
" def _fill_chunk(self, chunk):\n",
" return np.asarray([self._fill_item(item) for item in chunk])\n",
" \n",
" def _fill_item(self, item):\n",
" return item\n",
" \n",
" def event(self, item):\n",
" for key, is_filled in doc.get('filled', {}).items():\n",
" if self.exclude is not None and key in self.exclude:\n",
" continue\n",
" if self.include is not None and key not in self.include:\n",
" continue\n",
" if not is_filled:\n",
" datum_id = doc['data'][key]\n",
" # Look up the cached Datum doc.\n",
" try:\n",
" datum_doc = self._datum_cache[datum_id]\n",
" except KeyError as err:\n",
" err_with_key = UnresolvableForeignKeyError(\n",
" f\"Event with uid {doc['uid']} refers to unknown Datum \"\n",
" f\"datum_id {datum_id}\")\n",
" err_with_key.key = datum_id\n",
" raise err_with_key from err\n",
" resource_uid = datum_doc['resource']\n",
" # Look up the cached Resource.\n",
" try:\n",
" resource = self._resource_cache[resource_uid]\n",
" except KeyError as err:\n",
" raise UnresolvableForeignKeyError(\n",
" f\"Datum with id {datum_id} refers to unknown Resource \"\n",
" f\"uid {resource_uid}\") from err\n",
" # Look up the cached handler instance, or instantiate one.\n",
" try:\n",
" handler = self._handler_cache[resource['uid']]\n",
" except KeyError:\n",
" try:\n",
" handler_class = self.handler_registry[resource['spec']]\n",
" except KeyError as err:\n",
" raise UndefinedAssetSpecification(\n",
" f\"Resource document with uid {resource['uid']} \"\n",
" f\"refers to spec {resource['spec']!r} which is \"\n",
" f\"not defined in the Filler's \"\n",
" f\"handler registry.\") from err\n",
" try:\n",
" # Apply root_map.\n",
" resource_path = resource['resource_path']\n",
" root = resource.get('root', '')\n",
" root = self.root_map.get(root, root)\n",
" if root:\n",
" resource_path = os.path.join(root, resource_path)\n",
"\n",
" handler = handler_class(resource_path,\n",
" **resource['resource_kwargs'])\n",
" except Exception as err:\n",
" raise EventModelError(\n",
" f\"Error instantiating handler \"\n",
" f\"class {handler_class} \"\n",
" f\"with Resource document {resource}.\") from err\n",
" self._handler_cache[resource['uid']] = handler\n",
"\n",
" # We are sure to attempt to read that data at least once and\n",
" # then perhaps additional times depending on the contents of\n",
" # retry_intervals.\n",
" error = None\n",
" for interval in [0] + self.retry_intervals:\n",
" ttime.sleep(interval)\n",
" try:\n",
" actual_data = handler(**datum_doc['datum_kwargs'])\n",
" # Here we are intentionally modifying doc in place.\n",
" doc['data'][key] = actual_data\n",
" doc['filled'][key] = datum_id\n",
" except IOError as error_:\n",
" # The file may not be visible on the network yet.\n",
" # Wait and try again. Stash the error in a variable\n",
" # that we can access later if we run out of attempts.\n",
" error = error_\n",
" else:\n",
" break\n",
" else:\n",
" # We have used up all our attempts. There seems to be an\n",
" # actual problem. Raise the error stashed above.\n",
" raise DataNotAccessible(\n",
" f\"Filler was unable to load the data referenced by \"\n",
" f\"the Datum document {datum_doc} and the Resource \"\n",
" f\"document {resource}.\") from error\n",
" return doc\n",
" \n",
" def __enter__(self):\n",
" return self\n",
"\n",
" def close(self):\n",
" # Drop references to the caches. If the user holds another reference to\n",
" # them it's the user's problem to manage their lifecycle. If the user\n",
" # does not (e.g. they are the default caches) the gc will look after\n",
" # them.\n",
" self._closed = True\n",
" self._handler_cache = None\n",
" self._resource_cache = None\n",
" self._datum_cache = None\n",
"\n",
" def __exit__(self, *exc_details):\n",
" self.close()\n",
"\n",
" def __call__(self, name, doc, validate=False):\n",
" if self._closed:\n",
" raise EventModelRuntimeError(\n",
" \"This Filler has been closed and is no longer usable.\") \n"
]
},
{
"cell_type": "code",
"execution_count": 175,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'a': {'external': True}, 'b': {'external': True}, 'c': {'external': True}, 'd': {'external': True}, 'e': {'external': True}}\n"
]
},
{
"data": {
"text/plain": [
"<xarray.Dataset>\n",
"Dimensions: (dim_0: 16)\n",
"Dimensions without coordinates: dim_0\n",
"Data variables:\n",
" concatenate-310b50d2ae6c2aa9bffa8ad9e3e948c0 (dim_0) int64 16 97 ... 20 55\n",
" concatenate-b6d0c2882a1399ab6836845deadc669c (dim_0) int64 36 49 ... 39 31\n",
" concatenate-875db7b37eef10cbc0b783c156c74e44 (dim_0) int64 19 44 ... 69 37\n",
" concatenate-7d25690c3159f94b9cd50c94aafc7254 (dim_0) int64 54 84 2 ... 8 54\n",
" concatenate-0e1d0d16b3166c7a77f2527a75b20738 (dim_0) int64 98 65 ... 34 49"
]
},
"execution_count": 175,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"class BlueskyEventStream():\n",
" def __init__(self, num_col, num_row):\n",
" self._num_row = num_row\n",
" self._num_col = num_col\n",
" self._stream_name = 'primary'\n",
" \n",
" def _get_eventpages(self, descriptor_uid):\n",
" yield from [xevents(self._num_col,self._num_row)]*10\n",
" \n",
" def _get_datumpages(self, resource_uid):\n",
" yield from [xdatum(self._num_col,self._num_row)]*10\n",
" \n",
" def _get_event_descriptors(self):\n",
" data_keys = list(string.ascii_lowercase)[0:self._num_col]\n",
" return [{'name':'primary', 'uid':1, 'data_keys': {key:{'external': True} for key in data_keys}} \n",
" for i in range(3)]\n",
"\n",
" \n",
"bes = BlueskyEventStream(5,16)\n",
"filler = DaskFiller(chunk_size=4)\n",
"filled_events = filler.fill(bes)\n",
"#filled_events[0]['data'].to_dask_dataframe().visualize()\n",
"filled_events[0]['data'].compute() "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment