Last active
June 19, 2019 18:35
-
-
Save gwbischof/d7858ffb5bb78dc3e399711ea16b3974 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"!pip install graphviz\n", | |
"!pip install dask distributed --upgrade\n", | |
"!pip install bokeh" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 167, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import xarray\n", | |
"import random\n", | |
"import string\n", | |
"import os\n", | |
"from functools import partial\n", | |
" \n", | |
"def resource():\n", | |
" resource = {'path_semantics': 'posix',\n", | |
" 'resource_kwargs': {},\n", | |
" 'resource_path': '~/data_file',\n", | |
" 'root': '~/',\n", | |
" 'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd',\n", | |
" 'spec': 'SOME_SPEC',\n", | |
" 'uid': '272132cf-564f-428f-bf6b-149ee4287024'}\n", | |
" if not os.path.isfile(resource['resource_path']):\n", | |
" with open(os.path.expanduser(resource['resource_path']), \"w+\") as file:\n", | |
" file.write(str(range(10000)))\n", | |
" \n", | |
" return resource\n", | |
"\n", | |
"def xevents(numcol, numrows):\n", | |
" stream_key = 'descriptor' \n", | |
" coord_key = 'time' \n", | |
" array_keys = ['seq_num', 'time', 'uid'] \n", | |
" dataframe_keys = ['data', 'timestamps', 'filled'] \n", | |
" \n", | |
" coords = range(numrows)\n", | |
" xarr = partial(to_xarray, coord_label=coord_key, coords=coords)\n", | |
" datakeys = list(string.ascii_lowercase)[0:numcol]\n", | |
" \n", | |
" xpage = {**{stream_key: random.randint(0,100)}, \n", | |
" **{key: xarr(range(numrows), name=key) for key in array_keys}, \n", | |
" **{'data': xarray.merge({key: xarr(randlist(numrows), name=key) \n", | |
" for key in datakeys}.values())}, \n", | |
" **{'timestamps': xarray.merge({key: xarr(randlist(numrows), name=key) \n", | |
" for key in datakeys}.values())}, \n", | |
" **{'filled': xarray.merge({key: xarr(randlist(numrows), name=key) \n", | |
" for key in datakeys}.values())}} \n", | |
" \n", | |
" return xpage\n", | |
"\n", | |
"def xdatum(numcol, numrows): \n", | |
" stream_key = 'resource' \n", | |
" coord_key = 'datum_id' \n", | |
" array_keys = ['datum_id'] \n", | |
" dataframe_keys = ['datum_kwargs'] \n", | |
" \n", | |
" coords = range(numrows)\n", | |
" xarr = partial(to_xarray, coord_label=coord_key, coords=coords)\n", | |
" datakeys = list(string.ascii_lowercase)[0:numcol]\n", | |
" \n", | |
" xpage = {**{stream_key: random.randint(0,100)}, \n", | |
" **{key: xarr(range(numrows), name=key) for key in array_keys}, \n", | |
" **{'datum_kwargs': xarray.merge({key: xarr(randlist(numrows), name=key) \n", | |
" for key in datakeys}.values())}}\n", | |
" \n", | |
" return xpage\n", | |
"\n", | |
"\n", | |
"def randlist(length):\n", | |
" return [random.randint(0,100) for _ in range(length)] \n", | |
"\n", | |
"\n", | |
"def to_xarray(array, coord_label, coords, name):\n", | |
" array = (xarray.DataArray(array, \n", | |
" dims=(coord_label,), \n", | |
" coords={coord_label: coords}, \n", | |
" name=name))\n", | |
" return array" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 168, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from event_model import DocumentRouter\n", | |
"import numpy as np\n", | |
"from dask import array\n", | |
"from dask import delayed\n", | |
"import xarray\n", | |
"import time\n", | |
"\n", | |
"class DaskFiller():\n", | |
"\n", | |
" def __init__(self, handler_registry=None, *, \n", | |
" include=None, exclude=None, root_map=None,\n", | |
" handler_cache={}, resource_cache={}, datum_cache={}, chunk_size = 100,\n", | |
" retry_intervals=(0.001, 0.002, 0.004, 0.008, 0.016, 0.032,\n", | |
" 0.064, 0.128, 0.256, 0.512, 1.024)):\n", | |
" if include is not None and exclude is not None:\n", | |
" raise EventModelValueError(\n", | |
" \"The parameters `include` and `exclude` are mutually \"\n", | |
" \"incompatible. At least one must be left as the default, \"\n", | |
" \"None.\")\n", | |
" self.handler_registry = handler_registry\n", | |
" self.include = include\n", | |
" self.exclude = exclude\n", | |
" self.root_map = root_map or {}\n", | |
" self._handler_cache = handler_cache or {}\n", | |
" self._resource_cache = resource_cache or {}\n", | |
" self._datum_cache = datum_cache or {}\n", | |
" self._chunk_size = chunk_size\n", | |
" self.retry_intervals = list(retry_intervals)\n", | |
" self._closed = False\n", | |
"\n", | |
" def __repr__(self):\n", | |
" return \"<Filler>\" if not self._closed else \"<Closed Filler>\"\n", | |
" \n", | |
" def fill(self, bes):\n", | |
" # bes is short for BlueskyEventStream\n", | |
" descriptor_docs = [doc for doc in bes._get_event_descriptors() \n", | |
" if doc.get('name') == bes._stream_name]\n", | |
" \n", | |
" data_keys = descriptor_docs[0]['data_keys'] \n", | |
" print(data_keys)\n", | |
" more_keys = ['seq_num', 'uid'] \n", | |
" needs_filling = {key for key, value in data_keys.items() if value.get('external', False)} \n", | |
" \n", | |
" filled_pages = []\n", | |
" for descriptor in descriptor_docs:\n", | |
" filled_pages.extend([self._fill_eventpage(event_page, needs_filling) \n", | |
" for event_page in bes._get_eventpages([descriptor['uid']])])\n", | |
" \n", | |
" return filled_pages\n", | |
"\n", | |
" def _fill_eventpage(self, event_page, needs_filling): \n", | |
" stream_key = 'descriptor' \n", | |
" coord_key = 'time' \n", | |
" array_keys = ['seq_num', 'time', 'uid'] \n", | |
" dataframe_keys = ['data', 'timestamps', 'filled'] \n", | |
" \n", | |
" coords = event_page[coord_key] \n", | |
" xarr = partial(self._to_xarray, coord_label=coord_key, coords=coords) \n", | |
" filled_page = {**{stream_key: event_page[stream_key]}, \n", | |
" **{key: xarr(event_page[key], name=key) for key in array_keys}, \n", | |
" **{'data': xarray.merge({key: xarr(event_page['data'][key], name=key, fill=(key in needs_filling)) \n", | |
" for key in event_page['data'].keys()}.values())}, \n", | |
" **{'timestamps': xarray.merge({key: xarr(event_page['timestamps'][key], name=key)\n", | |
" for key in event_page['data'].keys()}.values())}, \n", | |
" **{'filled': xarray.merge({key: xarr(event_page['filled'][key], name=key) \n", | |
" for key in event_page['data'].keys()}.values())}} \n", | |
" \n", | |
" return filled_page\n", | |
" \n", | |
" def _xarray_datumpages(self, datum_page): \n", | |
" stream_key = 'resource' \n", | |
" coord_key = 'datum_id' \n", | |
" array_keys = ['datum_id'] \n", | |
" dataframe_keys = ['datum_kwargs'] \n", | |
" \n", | |
" coords = datum_page[coord_key] \n", | |
" xarr = partial(self._to_xarray, coord_label=coord_key, coords=coords) \n", | |
" xarray_datum = {**{stream_key: datum_page[stream_key]}, \n", | |
" **{key: xarr(datum_page[key], name=key) for key in array_keys}, \n", | |
" **{'datum_kwargs': xarray.merge({key: xarr(datum_page['datum_kwargs'][key], name=key)\n", | |
" for key in datum_page['datum_kwargs'].keys()}.values())}} \n", | |
" return xarray_datum\n", | |
"\n", | |
" def _to_xarray(self, collumn, coord_label, coords, name, fill=False):\n", | |
" if fill:\n", | |
" return xarray.DataArray(array.concatenate(\n", | |
" [array.from_delayed(self._fill_chunk(chunk), (len(chunk),), dtype=object)\n", | |
" for chunk in self._chunks(collumn, self._chunk_size)]))\n", | |
" else:\n", | |
" return (xarray.DataArray(collumn, \n", | |
" dims=(coord_label,), \n", | |
" coords={coord_label: coords}, \n", | |
" name=name))\n", | |
"\n", | |
" def _chunks(self, col, chunk_size):\n", | |
" for i in range(0, len(col), chunk_size):\n", | |
" yield col[i:i + chunk_size]\n", | |
"\n", | |
" @delayed\n", | |
" def _fill_chunk(self, chunk):\n", | |
" return np.asarray([self._fill_item(item) for item in chunk])\n", | |
" \n", | |
" def _fill_item(self, item):\n", | |
" return item\n", | |
" \n", | |
" def event(self, item):\n", | |
" for key, is_filled in doc.get('filled', {}).items():\n", | |
" if self.exclude is not None and key in self.exclude:\n", | |
" continue\n", | |
" if self.include is not None and key not in self.include:\n", | |
" continue\n", | |
" if not is_filled:\n", | |
" datum_id = doc['data'][key]\n", | |
" # Look up the cached Datum doc.\n", | |
" try:\n", | |
" datum_doc = self._datum_cache[datum_id]\n", | |
" except KeyError as err:\n", | |
" err_with_key = UnresolvableForeignKeyError(\n", | |
" f\"Event with uid {doc['uid']} refers to unknown Datum \"\n", | |
" f\"datum_id {datum_id}\")\n", | |
" err_with_key.key = datum_id\n", | |
" raise err_with_key from err\n", | |
" resource_uid = datum_doc['resource']\n", | |
" # Look up the cached Resource.\n", | |
" try:\n", | |
" resource = self._resource_cache[resource_uid]\n", | |
" except KeyError as err:\n", | |
" raise UnresolvableForeignKeyError(\n", | |
" f\"Datum with id {datum_id} refers to unknown Resource \"\n", | |
" f\"uid {resource_uid}\") from err\n", | |
" # Look up the cached handler instance, or instantiate one.\n", | |
" try:\n", | |
" handler = self._handler_cache[resource['uid']]\n", | |
" except KeyError:\n", | |
" try:\n", | |
" handler_class = self.handler_registry[resource['spec']]\n", | |
" except KeyError as err:\n", | |
" raise UndefinedAssetSpecification(\n", | |
" f\"Resource document with uid {resource['uid']} \"\n", | |
" f\"refers to spec {resource['spec']!r} which is \"\n", | |
" f\"not defined in the Filler's \"\n", | |
" f\"handler registry.\") from err\n", | |
" try:\n", | |
" # Apply root_map.\n", | |
" resource_path = resource['resource_path']\n", | |
" root = resource.get('root', '')\n", | |
" root = self.root_map.get(root, root)\n", | |
" if root:\n", | |
" resource_path = os.path.join(root, resource_path)\n", | |
"\n", | |
" handler = handler_class(resource_path,\n", | |
" **resource['resource_kwargs'])\n", | |
" except Exception as err:\n", | |
" raise EventModelError(\n", | |
" f\"Error instantiating handler \"\n", | |
" f\"class {handler_class} \"\n", | |
" f\"with Resource document {resource}.\") from err\n", | |
" self._handler_cache[resource['uid']] = handler\n", | |
"\n", | |
" # We are sure to attempt to read that data at least once and\n", | |
" # then perhaps additional times depending on the contents of\n", | |
" # retry_intervals.\n", | |
" error = None\n", | |
" for interval in [0] + self.retry_intervals:\n", | |
" ttime.sleep(interval)\n", | |
" try:\n", | |
" actual_data = handler(**datum_doc['datum_kwargs'])\n", | |
" # Here we are intentionally modifying doc in place.\n", | |
" doc['data'][key] = actual_data\n", | |
" doc['filled'][key] = datum_id\n", | |
" except IOError as error_:\n", | |
" # The file may not be visible on the network yet.\n", | |
" # Wait and try again. Stash the error in a variable\n", | |
" # that we can access later if we run out of attempts.\n", | |
" error = error_\n", | |
" else:\n", | |
" break\n", | |
" else:\n", | |
" # We have used up all our attempts. There seems to be an\n", | |
" # actual problem. Raise the error stashed above.\n", | |
" raise DataNotAccessible(\n", | |
" f\"Filler was unable to load the data referenced by \"\n", | |
" f\"the Datum document {datum_doc} and the Resource \"\n", | |
" f\"document {resource}.\") from error\n", | |
" return doc\n", | |
" \n", | |
" def __enter__(self):\n", | |
" return self\n", | |
"\n", | |
" def close(self):\n", | |
" # Drop references to the caches. If the user holds another reference to\n", | |
" # them it's the user's problem to manage their lifecycle. If the user\n", | |
" # does not (e.g. they are the default caches) the gc will look after\n", | |
" # them.\n", | |
" self._closed = True\n", | |
" self._handler_cache = None\n", | |
" self._resource_cache = None\n", | |
" self._datum_cache = None\n", | |
"\n", | |
" def __exit__(self, *exc_details):\n", | |
" self.close()\n", | |
"\n", | |
" def __call__(self, name, doc, validate=False):\n", | |
" if self._closed:\n", | |
" raise EventModelRuntimeError(\n", | |
" \"This Filler has been closed and is no longer usable.\") \n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 175, | |
"metadata": { | |
"scrolled": true | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"{'a': {'external': True}, 'b': {'external': True}, 'c': {'external': True}, 'd': {'external': True}, 'e': {'external': True}}\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/plain": [ | |
"<xarray.Dataset>\n", | |
"Dimensions: (dim_0: 16)\n", | |
"Dimensions without coordinates: dim_0\n", | |
"Data variables:\n", | |
" concatenate-310b50d2ae6c2aa9bffa8ad9e3e948c0 (dim_0) int64 16 97 ... 20 55\n", | |
" concatenate-b6d0c2882a1399ab6836845deadc669c (dim_0) int64 36 49 ... 39 31\n", | |
" concatenate-875db7b37eef10cbc0b783c156c74e44 (dim_0) int64 19 44 ... 69 37\n", | |
" concatenate-7d25690c3159f94b9cd50c94aafc7254 (dim_0) int64 54 84 2 ... 8 54\n", | |
" concatenate-0e1d0d16b3166c7a77f2527a75b20738 (dim_0) int64 98 65 ... 34 49" | |
] | |
}, | |
"execution_count": 175, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"class BlueskyEventStream():\n", | |
" def __init__(self, num_col, num_row):\n", | |
" self._num_row = num_row\n", | |
" self._num_col = num_col\n", | |
" self._stream_name = 'primary'\n", | |
" \n", | |
" def _get_eventpages(self, descriptor_uid):\n", | |
" yield from [xevents(self._num_col,self._num_row)]*10\n", | |
" \n", | |
" def _get_datumpages(self, resource_uid):\n", | |
" yield from [xdatum(self._num_col,self._num_row)]*10\n", | |
" \n", | |
" def _get_event_descriptors(self):\n", | |
" data_keys = list(string.ascii_lowercase)[0:self._num_col]\n", | |
" return [{'name':'primary', 'uid':1, 'data_keys': {key:{'external': True} for key in data_keys}} \n", | |
" for i in range(3)]\n", | |
"\n", | |
" \n", | |
"bes = BlueskyEventStream(5,16)\n", | |
"filler = DaskFiller(chunk_size=4)\n", | |
"filled_events = filler.fill(bes)\n", | |
"#filled_events[0]['data'].to_dask_dataframe().visualize()\n", | |
"filled_events[0]['data'].compute() " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.7.1" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment