Created
May 24, 2022 03:30
-
-
Save rabernat/15f77fb447e2cdbc73c4031c59768886 to your computer and use it in GitHub Desktop.
WIP Xarray to Zarr pipeline with Beam
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"id": "07aa0beb", | |
"metadata": {}, | |
"source": [ | |
"What is hard about Pangeo Forge.\n", | |
"\n", | |
"- Unknown size of inputs\n", | |
"- Possibly uneven size of inputs\n", | |
"- Need to write regular Zarr chunks\n", | |
"- Need to initialize the Zarr dataset" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"id": "f6478a1a-89ec-4120-a738-2502d671994d", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# copied / modified from tests\n", | |
"\n", | |
"import xarray as xr\n", | |
"import numpy as np\n", | |
"import pandas as pd\n", | |
"import os\n", | |
"\n", | |
"from tempfile import TemporaryDirectory\n", | |
"\n", | |
"def daily_xarray_dataset():\n", | |
" \"\"\"Return a synthetic random xarray dataset.\"\"\"\n", | |
" np.random.seed(1)\n", | |
" # TODO: change nt to 11 in order to catch the edge case where\n", | |
" # items_per_input does not evenly divide the length of the sequence dimension\n", | |
" nt, ny, nx = 10, 18, 36\n", | |
" time = pd.date_range(start=\"2010-01-01\", periods=nt, freq=\"D\")\n", | |
" lon = (np.arange(nx) + 0.5) * 360 / nx\n", | |
" lon_attrs = {\"units\": \"degrees_east\", \"long_name\": \"longitude\"}\n", | |
" lat = (np.arange(ny) + 0.5) * 180 / ny\n", | |
" lat_attrs = {\"units\": \"degrees_north\", \"long_name\": \"latitude\"}\n", | |
" foo = np.random.rand(nt, ny, nx)\n", | |
" foo_attrs = {\"long_name\": \"Fantastic Foo\"}\n", | |
" # make sure things work with heterogenous data types\n", | |
" bar = np.random.randint(0, 10, size=(nt, ny, nx))\n", | |
" bar_attrs = {\"long_name\": \"Beautiful Bar\"}\n", | |
" dims = (\"time\", \"lat\", \"lon\")\n", | |
" ds = xr.Dataset(\n", | |
" {\"bar\": (dims, bar, bar_attrs), \"foo\": (dims, foo, foo_attrs)},\n", | |
" coords={\n", | |
" \"time\": (\"time\", time),\n", | |
" \"lat\": (\"lat\", lat, lat_attrs),\n", | |
" \"lon\": (\"lon\", lon, lon_attrs),\n", | |
" },\n", | |
" attrs={\"conventions\": \"CF 1.6\"},\n", | |
" )\n", | |
" return ds\n", | |
"\n", | |
"\n", | |
"def split_up_files_by_day(ds, day_param):\n", | |
" gb = ds.resample(time=f\"{day_param}D\")\n", | |
" _, datasets = zip(*gb)\n", | |
" fnames = [f\"{n:03d}.nc\" for n in range(len(datasets))]\n", | |
" return datasets, fnames\n", | |
"\n", | |
"\n", | |
"def make_netcdf_local_paths(items_per_file=2):\n", | |
" td = TemporaryDirectory()\n", | |
" tmp_path = td.name\n", | |
" \n", | |
" ds = daily_xarray_dataset()\n", | |
" datasets, fnames = split_up_files_by_day(ds, items_per_file)\n", | |
"\n", | |
" full_paths = [os.path.join(tmp_path, fname) for fname in fnames]\n", | |
" xr.save_mfdataset(datasets, [str(path) for path in full_paths])\n", | |
" return td, full_paths\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"id": "e4690e7a-5f19-4b38-a46c-8532ab1030f3", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"['/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/000.nc',\n", | |
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/001.nc',\n", | |
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/002.nc',\n", | |
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/003.nc',\n", | |
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/004.nc']" | |
] | |
}, | |
"execution_count": 2, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"td, paths = make_netcdf_local_paths()\n", | |
"paths" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"id": "1c6b92d7-8d0d-4c29-a866-212a736f6322", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"ds0 = xr.open_dataset(paths[0])\n", | |
"assert not ds0.foo._in_memory\n", | |
"ds0_dict = ds0.to_dict(data=False)\n", | |
"assert not ds0.foo._in_memory" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"id": "98162cda-7993-4ef0-b1ff-671fa80103e0", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"{'unlimited_dims': set(),\n", | |
" 'source': '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/000.nc'}" | |
] | |
}, | |
"execution_count": 4, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"ds0.encoding" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"id": "ed81fdf4-da1e-434e-9888-89ac30335ae5", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"{'coords': {'time': {'dims': ('time',),\n", | |
" 'attrs': {},\n", | |
" 'dtype': 'datetime64[ns]',\n", | |
" 'shape': (2,)},\n", | |
" 'lat': {'dims': ('lat',),\n", | |
" 'attrs': {'units': 'degrees_north', 'long_name': 'latitude'},\n", | |
" 'dtype': 'float64',\n", | |
" 'shape': (18,)},\n", | |
" 'lon': {'dims': ('lon',),\n", | |
" 'attrs': {'units': 'degrees_east', 'long_name': 'longitude'},\n", | |
" 'dtype': 'float64',\n", | |
" 'shape': (36,)}},\n", | |
" 'attrs': {'conventions': 'CF 1.6'},\n", | |
" 'dims': {'time': 2, 'lat': 18, 'lon': 36},\n", | |
" 'data_vars': {'bar': {'dims': ('time', 'lat', 'lon'),\n", | |
" 'attrs': {'long_name': 'Beautiful Bar'},\n", | |
" 'dtype': 'int64',\n", | |
" 'shape': (2, 18, 36)},\n", | |
" 'foo': {'dims': ('time', 'lat', 'lon'),\n", | |
" 'attrs': {'long_name': 'Fantastic Foo'},\n", | |
" 'dtype': 'float64',\n", | |
" 'shape': (2, 18, 36)}}}" | |
] | |
}, | |
"execution_count": 5, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"# does not include encoding; otherwise might be enough for template\n", | |
"# maybe that's an xarray issue to open?\n", | |
"ds0_dict" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"id": "73e40c1a-30e8-4925-8726-7cdf77e628df", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"<FilePattern {'time': 5}>" | |
] | |
}, | |
"execution_count": 6, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"from pangeo_forge_recipes.patterns import FilePattern, ConcatDim, Index, CombineOp\n", | |
"\n", | |
"base_path = td.name\n", | |
"def format_function(time):\n", | |
" return f\"{base_path}/{time:03d}.nc\"\n", | |
"\n", | |
"fp = FilePattern(\n", | |
" format_function,\n", | |
" ConcatDim(\"time\", list(range(5)))\n", | |
")\n", | |
"fp" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"id": "25ba5713-d80c-4593-849a-a1f32da52af4", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"[(frozenset({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n", | |
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/000.nc'),\n", | |
" (frozenset({DimIndex(name='time', index=1, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n", | |
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/001.nc'),\n", | |
" (frozenset({DimIndex(name='time', index=2, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n", | |
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/002.nc'),\n", | |
" (frozenset({DimIndex(name='time', index=3, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n", | |
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/003.nc'),\n", | |
" (frozenset({DimIndex(name='time', index=4, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n", | |
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/004.nc')]" | |
] | |
}, | |
"execution_count": 7, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"list(fp.items())" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"id": "697d66fb-0b04-4b7a-a503-baa9a96382bf", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"frozenset({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)})" | |
] | |
}, | |
"execution_count": 8, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"index, item = next(fp.items())\n", | |
"index" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"id": "9d53a649-fb38-47f0-920d-52e50dc6d497", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"frozenset({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)})" | |
] | |
}, | |
"execution_count": 9, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"index" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"id": "7dd68912", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"application/javascript": [ | |
"\n", | |
" if (typeof window.interactive_beam_jquery == 'undefined') {\n", | |
" var jqueryScript = document.createElement('script');\n", | |
" jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n", | |
" jqueryScript.type = 'text/javascript';\n", | |
" jqueryScript.onload = function() {\n", | |
" var datatableScript = document.createElement('script');\n", | |
" datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n", | |
" datatableScript.type = 'text/javascript';\n", | |
" datatableScript.onload = function() {\n", | |
" window.interactive_beam_jquery = jQuery.noConflict(true);\n", | |
" window.interactive_beam_jquery(document).ready(function($){\n", | |
" \n", | |
" });\n", | |
" }\n", | |
" document.head.appendChild(datatableScript);\n", | |
" };\n", | |
" document.head.appendChild(jqueryScript);\n", | |
" } else {\n", | |
" window.interactive_beam_jquery(document).ready(function($){\n", | |
" \n", | |
" });\n", | |
" }" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/Users/rpa/Library/Jupyter/runtime/kernel-c469c754-043a-4000-8dee-856fdcc97ae6.json']\n" | |
] | |
} | |
], | |
"source": [ | |
"import apache_beam as beam\n", | |
"from apache_beam.options import pipeline_options\n", | |
"from apache_beam.runners.interactive import interactive_runner\n", | |
"import apache_beam.runners.interactive.interactive_beam as ib\n", | |
"\n", | |
"from typing import Tuple, List, Sequence, Dict, Any, TypeVar, Optional\n", | |
"\n", | |
"\n", | |
"T = TypeVar('T') \n", | |
"\n", | |
"class LoadXarrayDataset(beam.DoFn):\n", | |
" def process(self, element: Tuple[Index, str]) -> List[Tuple[Index, xr.Dataset]]:\n", | |
" key, path = element\n", | |
" ds = xr.open_dataset(path)\n", | |
" return [(key, ds)]\n", | |
" \n", | |
"class XarrayDatasetToSchema(beam.DoFn):\n", | |
" def process(self, element: Tuple[Index, xr.Dataset]) -> List[Tuple[Index, Dict]]:\n", | |
" key, ds = element\n", | |
" ds_dict = ds.to_dict(data=False)\n", | |
" return [(key, ds_dict)]\n", | |
" \n", | |
"class SchemaToDimLen(beam.DoFn):\n", | |
" def process(self, element: Tuple[Index, Dict], dim: str) -> List[Tuple[Index, int]]:\n", | |
" index, schema = element\n", | |
" return [(index, schema['dims'][dim])]\n", | |
" \n", | |
"class DropIndex(beam.DoFn):\n", | |
" def process(self, element: Tuple[Index, T]) -> List[T]:\n", | |
" return [element[1]]\n", | |
" \n", | |
" \n", | |
"# TODO:\n", | |
"# - don't hard code time as the concat-dim\n", | |
"# - allow multiple concat dims\n", | |
"# - generalize to include metadata harmonization\n", | |
"def calculate_total_size(schemas: Sequence[Tuple[Index, Dict]], dim: str) -> int:\n", | |
" # hopefully the data variables have not been loaded\n", | |
" return sum(time_lens)\n", | |
" \n", | |
"\n", | |
"def is_first_element(element: Tuple[Index, T], concat_dim: str) -> bool:\n", | |
" index, item = element\n", | |
" for dindex in index:\n", | |
" if (dindex.name == concat_dim) and (dindex.operation == CombineOp.CONCAT) and (dindex.index == 0):\n", | |
" return True\n", | |
" return False\n", | |
"\n", | |
"def get_first_element(elements: Sequence[Tuple[Index, T]], concat_dim: str):\n", | |
" for e in elements:\n", | |
" if is_first_element(e, concat_dim):\n", | |
" return e\n", | |
"\n", | |
"\n", | |
"class DummyPrepareTarget(beam.DoFn):\n", | |
" def process(self, element: Tuple[Index, xr.Dataset], dim: str, total_len: int) -> None:\n", | |
" # should actually return a zarr group\n", | |
" index, ds = element\n", | |
" print(index, ds.dims, dim, total_len)\n", | |
" \n", | |
"\n", | |
" \n", | |
"options = pipeline_options.PipelineOptions()\n", | |
"runner = interactive_runner.InteractiveRunner()\n", | |
"\n", | |
"p = beam.Pipeline(runner, options=options)\n", | |
"\n", | |
"inputs = p | beam.Create(fp.items())\n", | |
"all_datasets = inputs | beam.ParDo(LoadXarrayDataset())\n", | |
"schemas = all_datasets | beam.ParDo(XarrayDatasetToSchema())\n", | |
"time_lens = schemas | beam.ParDo(SchemaToDimLen(), \"time\")\n", | |
"time_len = time_lens | beam.ParDo(DropIndex()) | beam.CombineGlobally(sum)\n", | |
"first_ds = all_datasets | beam.CombineGlobally(get_first_element, \"time\")\n", | |
"target = first_ds | beam.ParDo(DummyPrepareTarget(), \"time\", beam.pvalue.AsSingleton(time_len))\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"id": "c59bd22f", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"\n", | |
" <link rel=\"stylesheet\" href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\" integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\" crossorigin=\"anonymous\">\n", | |
" <div id=\"progress_indicator_b33df83a230e72a76d738872b71da9ed\" class=\"spinner-border text-info\" role=\"status\">\n", | |
" </div>" | |
], | |
"text/plain": [ | |
"<IPython.core.display.HTML object>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"text/html": [ | |
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n", | |
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n", | |
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n", | |
"<!-- Generated by graphviz version 2.49.1 (0)\n", | |
" -->\n", | |
"<!-- Title: G Pages: 1 -->\n", | |
"<svg width=\"514pt\" height=\"1300pt\"\n", | |
" viewBox=\"0.00 0.00 513.50 1299.95\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n", | |
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 1295.95)\">\n", | |
"<title>G</title>\n", | |
"<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-1295.95 509.5,-1295.95 509.5,4 -4,4\"/>\n", | |
"<!-- [10]: Create -->\n", | |
"<g id=\"node1\" class=\"node\">\n", | |
"<title>[10]: Create</title>\n", | |
"<polygon fill=\"none\" stroke=\"blue\" points=\"289,-1291.95 205,-1291.95 205,-1255.95 289,-1255.95 289,-1291.95\"/>\n", | |
"<text text-anchor=\"middle\" x=\"247\" y=\"-1270.25\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: Create</text>\n", | |
"</g>\n", | |
"<!-- inputs -->\n", | |
"<g id=\"node2\" class=\"node\">\n", | |
"<title>inputs</title>\n", | |
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"247\" cy=\"-1186.8\" rx=\"33.29\" ry=\"33.29\"/>\n", | |
"<text text-anchor=\"middle\" x=\"247\" y=\"-1183.1\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">inputs</text>\n", | |
"</g>\n", | |
"<!-- [10]: Create->inputs -->\n", | |
"<g id=\"edge1\" class=\"edge\">\n", | |
"<title>[10]: Create->inputs</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M247,-1255.72C247,-1248.3 247,-1239.28 247,-1230.26\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"250.5,-1230.16 247,-1220.16 243.5,-1230.16 250.5,-1230.16\"/>\n", | |
"</g>\n", | |
"<!-- [10]: ParDo(LoadXarrayDataset) -->\n", | |
"<g id=\"node3\" class=\"node\">\n", | |
"<title>[10]: ParDo(LoadXarrayDataset)</title>\n", | |
"<polygon fill=\"none\" stroke=\"blue\" points=\"348,-1117.66 146,-1117.66 146,-1081.66 348,-1081.66 348,-1117.66\"/>\n", | |
"<text text-anchor=\"middle\" x=\"247\" y=\"-1095.96\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(LoadXarrayDataset)</text>\n", | |
"</g>\n", | |
"<!-- inputs->[10]: ParDo(LoadXarrayDataset) -->\n", | |
"<g id=\"edge2\" class=\"edge\">\n", | |
"<title>inputs->[10]: ParDo(LoadXarrayDataset)</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M247,-1153.61C247,-1145.23 247,-1136.27 247,-1128.16\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"250.5,-1128.09 247,-1118.09 243.5,-1128.09 250.5,-1128.09\"/>\n", | |
"</g>\n", | |
"<!-- all_datasets -->\n", | |
"<g id=\"node4\" class=\"node\">\n", | |
"<title>all_datasets</title>\n", | |
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"247\" cy=\"-992.36\" rx=\"53.09\" ry=\"53.09\"/>\n", | |
"<text text-anchor=\"middle\" x=\"247\" y=\"-988.66\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">all_datasets</text>\n", | |
"</g>\n", | |
"<!-- [10]: ParDo(LoadXarrayDataset)->all_datasets -->\n", | |
"<g id=\"edge3\" class=\"edge\">\n", | |
"<title>[10]: ParDo(LoadXarrayDataset)->all_datasets</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M247,-1081.27C247,-1074.06 247,-1065.22 247,-1055.96\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"250.5,-1055.8 247,-1045.8 243.5,-1055.8 250.5,-1055.8\"/>\n", | |
"</g>\n", | |
"<!-- [10]: ParDo(XarrayDatasetToSchema) -->\n", | |
"<g id=\"node5\" class=\"node\">\n", | |
"<title>[10]: ParDo(XarrayDatasetToSchema)</title>\n", | |
"<polygon fill=\"none\" stroke=\"blue\" points=\"232,-903.06 0,-903.06 0,-867.06 232,-867.06 232,-903.06\"/>\n", | |
"<text text-anchor=\"middle\" x=\"116\" y=\"-881.36\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(XarrayDatasetToSchema)</text>\n", | |
"</g>\n", | |
"<!-- all_datasets->[10]: ParDo(XarrayDatasetToSchema) -->\n", | |
"<g id=\"edge4\" class=\"edge\">\n", | |
"<title>all_datasets->[10]: ParDo(XarrayDatasetToSchema)</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M205.91,-958.33C186.13,-942.43 162.88,-923.75 144.98,-909.36\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"147.15,-906.61 137.17,-903.08 142.77,-912.07 147.15,-906.61\"/>\n", | |
"</g>\n", | |
"<!-- [10]: CombineGlobally(get_first_element) -->\n", | |
"<g id=\"node13\" class=\"node\">\n", | |
"<title>[10]: CombineGlobally(get_first_element)</title>\n", | |
"<polygon fill=\"none\" stroke=\"blue\" points=\"505.5,-903.06 250.5,-903.06 250.5,-867.06 505.5,-867.06 505.5,-903.06\"/>\n", | |
"<text text-anchor=\"middle\" x=\"378\" y=\"-881.36\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: CombineGlobally(get_first_element)</text>\n", | |
"</g>\n", | |
"<!-- all_datasets->[10]: CombineGlobally(get_first_element) -->\n", | |
"<g id=\"edge5\" class=\"edge\">\n", | |
"<title>all_datasets->[10]: CombineGlobally(get_first_element)</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M288.09,-958.33C307.87,-942.43 331.12,-923.75 349.02,-909.36\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"351.23,-912.07 356.83,-903.08 346.85,-906.61 351.23,-912.07\"/>\n", | |
"</g>\n", | |
"<!-- schemas -->\n", | |
"<g id=\"node6\" class=\"node\">\n", | |
"<title>schemas</title>\n", | |
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"143\" cy=\"-789.47\" rx=\"41.69\" ry=\"41.69\"/>\n", | |
"<text text-anchor=\"middle\" x=\"143\" y=\"-785.77\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">schemas</text>\n", | |
"</g>\n", | |
"<!-- [10]: ParDo(XarrayDatasetToSchema)->schemas -->\n", | |
"<g id=\"edge6\" class=\"edge\">\n", | |
"<title>[10]: ParDo(XarrayDatasetToSchema)->schemas</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M120.95,-866.9C123.21,-859.06 126.02,-849.34 128.86,-839.49\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"132.29,-840.24 131.7,-829.66 125.56,-838.3 132.29,-840.24\"/>\n", | |
"</g>\n", | |
"<!-- [10]: ParDo(SchemaToDimLen) -->\n", | |
"<g id=\"node7\" class=\"node\">\n", | |
"<title>[10]: ParDo(SchemaToDimLen)</title>\n", | |
"<polygon fill=\"none\" stroke=\"blue\" points=\"295,-691.53 97,-691.53 97,-655.53 295,-655.53 295,-691.53\"/>\n", | |
"<text text-anchor=\"middle\" x=\"196\" y=\"-669.83\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(SchemaToDimLen)</text>\n", | |
"</g>\n", | |
"<!-- schemas->[10]: ParDo(SchemaToDimLen) -->\n", | |
"<g id=\"edge7\" class=\"edge\">\n", | |
"<title>schemas->[10]: ParDo(SchemaToDimLen)</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M160.21,-751.47C167.89,-734.95 176.77,-715.86 183.77,-700.82\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"187.01,-702.14 188.06,-691.6 180.67,-699.19 187.01,-702.14\"/>\n", | |
"</g>\n", | |
"<!-- time_lens -->\n", | |
"<g id=\"node8\" class=\"node\">\n", | |
"<title>time_lens</title>\n", | |
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"198\" cy=\"-553.03\" rx=\"46.29\" ry=\"46.29\"/>\n", | |
"<text text-anchor=\"middle\" x=\"198\" y=\"-549.33\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">time_lens</text>\n", | |
"</g>\n", | |
"<!-- [10]: ParDo(SchemaToDimLen)->time_lens -->\n", | |
"<g id=\"edge8\" class=\"edge\">\n", | |
"<title>[10]: ParDo(SchemaToDimLen)->time_lens</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M196.29,-655.3C196.5,-643.18 196.78,-626.22 197.06,-609.65\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"200.56,-609.49 197.23,-599.43 193.57,-609.37 200.56,-609.49\"/>\n", | |
"</g>\n", | |
"<!-- [10]: ParDo(DropIndex) -->\n", | |
"<g id=\"node9\" class=\"node\">\n", | |
"<title>[10]: ParDo(DropIndex)</title>\n", | |
"<polygon fill=\"none\" stroke=\"blue\" points=\"279.5,-470.89 126.5,-470.89 126.5,-434.89 279.5,-434.89 279.5,-470.89\"/>\n", | |
"<text text-anchor=\"middle\" x=\"203\" y=\"-449.19\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(DropIndex)</text>\n", | |
"</g>\n", | |
"<!-- time_lens->[10]: ParDo(DropIndex) -->\n", | |
"<g id=\"edge9\" class=\"edge\">\n", | |
"<title>time_lens->[10]: ParDo(DropIndex)</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M200.31,-506.69C200.75,-497.99 201.2,-489.13 201.61,-481.22\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"205.11,-481.28 202.12,-471.12 198.12,-480.93 205.11,-481.28\"/>\n", | |
"</g>\n", | |
"<!-- pcoll682 -->\n", | |
"<g id=\"node10\" class=\"node\">\n", | |
"<title>pcoll682</title>\n", | |
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"212\" cy=\"-380.89\" rx=\"18\" ry=\"18\"/>\n", | |
"</g>\n", | |
"<!-- [10]: ParDo(DropIndex)->pcoll682 -->\n", | |
"<g id=\"edge10\" class=\"edge\">\n", | |
"<title>[10]: ParDo(DropIndex)->pcoll682</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M205.22,-434.58C206.22,-426.87 207.41,-417.6 208.51,-409\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"212,-409.36 209.8,-398.99 205.05,-408.46 212,-409.36\"/>\n", | |
"</g>\n", | |
"<!-- [10]: CombineGlobally(sum) -->\n", | |
"<g id=\"node11\" class=\"node\">\n", | |
"<title>[10]: CombineGlobally(sum)</title>\n", | |
"<polygon fill=\"none\" stroke=\"blue\" points=\"320,-326.89 140,-326.89 140,-290.89 320,-290.89 320,-326.89\"/>\n", | |
"<text text-anchor=\"middle\" x=\"230\" y=\"-305.19\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: CombineGlobally(sum)</text>\n", | |
"</g>\n", | |
"<!-- pcoll682->[10]: CombineGlobally(sum) -->\n", | |
"<g id=\"edge11\" class=\"edge\">\n", | |
"<title>pcoll682->[10]: CombineGlobally(sum)</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M216.27,-363.3C218.27,-355.51 220.71,-346.02 222.97,-337.22\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"226.41,-337.89 225.51,-327.33 219.63,-336.15 226.41,-337.89\"/>\n", | |
"</g>\n", | |
"<!-- time_len -->\n", | |
"<g id=\"node12\" class=\"node\">\n", | |
"<title>time_len</title>\n", | |
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"258\" cy=\"-212.64\" rx=\"42.49\" ry=\"42.49\"/>\n", | |
"<text text-anchor=\"middle\" x=\"258\" y=\"-208.94\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">time_len</text>\n", | |
"</g>\n", | |
"<!-- [10]: CombineGlobally(sum)->time_len -->\n", | |
"<g id=\"edge12\" class=\"edge\">\n", | |
"<title>[10]: CombineGlobally(sum)->time_len</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M235.14,-290.6C237.46,-282.79 240.33,-273.12 243.25,-263.3\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"246.67,-264.08 246.16,-253.5 239.96,-262.09 246.67,-264.08\"/>\n", | |
"</g>\n", | |
"<!-- [10]: ParDo(DummyPrepareTarget) -->\n", | |
"<g id=\"node15\" class=\"node\">\n", | |
"<title>[10]: ParDo(DummyPrepareTarget)</title>\n", | |
"<polygon fill=\"none\" stroke=\"blue\" points=\"401,-134.39 185,-134.39 185,-98.39 401,-98.39 401,-134.39\"/>\n", | |
"<text text-anchor=\"middle\" x=\"293\" y=\"-112.69\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(DummyPrepareTarget)</text>\n", | |
"</g>\n", | |
"<!-- time_len->[10]: ParDo(DummyPrepareTarget) -->\n", | |
"<g id=\"edge13\" class=\"edge\">\n", | |
"<title>time_len->[10]: ParDo(DummyPrepareTarget)</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M272.44,-172.76C276,-163.18 279.73,-153.14 283.02,-144.27\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"286.32,-145.44 286.52,-134.85 279.76,-143 286.32,-145.44\"/>\n", | |
"</g>\n", | |
"<!-- first_ds -->\n", | |
"<g id=\"node14\" class=\"node\">\n", | |
"<title>first_ds</title>\n", | |
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"364\" cy=\"-673.53\" rx=\"38.19\" ry=\"38.19\"/>\n", | |
"<text text-anchor=\"middle\" x=\"364\" y=\"-669.83\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">first_ds</text>\n", | |
"</g>\n", | |
"<!-- [10]: CombineGlobally(get_first_element)->first_ds -->\n", | |
"<g id=\"edge14\" class=\"edge\">\n", | |
"<title>[10]: CombineGlobally(get_first_element)->first_ds</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M376.85,-866.92C374.76,-835.57 370.24,-767.99 367.16,-721.88\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"370.65,-721.56 366.49,-711.82 363.67,-722.03 370.65,-721.56\"/>\n", | |
"</g>\n", | |
"<!-- first_ds->[10]: ParDo(DummyPrepareTarget) -->\n", | |
"<g id=\"edge15\" class=\"edge\">\n", | |
"<title>first_ds->[10]: ParDo(DummyPrepareTarget)</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M361.79,-635.01C359.42,-591.48 356,-517.51 356,-453.89 356,-453.89 356,-453.89 356,-307.89 356,-246.49 326.14,-179.34 307.53,-143.46\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"310.62,-141.81 302.84,-134.62 304.44,-145.09 310.62,-141.81\"/>\n", | |
"</g>\n", | |
"<!-- target -->\n", | |
"<g id=\"node16\" class=\"node\">\n", | |
"<title>target</title>\n", | |
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"293\" cy=\"-31.2\" rx=\"31.4\" ry=\"31.4\"/>\n", | |
"<text text-anchor=\"middle\" x=\"293\" y=\"-27.5\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">target</text>\n", | |
"</g>\n", | |
"<!-- [10]: ParDo(DummyPrepareTarget)->target -->\n", | |
"<g id=\"edge16\" class=\"edge\">\n", | |
"<title>[10]: ParDo(DummyPrepareTarget)->target</title>\n", | |
"<path fill=\"none\" stroke=\"black\" d=\"M293,-98.16C293,-90.69 293,-81.6 293,-72.59\"/>\n", | |
"<polygon fill=\"black\" stroke=\"black\" points=\"296.5,-72.53 293,-62.53 289.5,-72.53 296.5,-72.53\"/>\n", | |
"</g>\n", | |
"</g>\n", | |
"</svg>\n" | |
], | |
"text/plain": [ | |
"<IPython.core.display.HTML object>" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
}, | |
{ | |
"data": { | |
"application/javascript": [ | |
"\n", | |
" if (typeof window.interactive_beam_jquery == 'undefined') {\n", | |
" var jqueryScript = document.createElement('script');\n", | |
" jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n", | |
" jqueryScript.type = 'text/javascript';\n", | |
" jqueryScript.onload = function() {\n", | |
" var datatableScript = document.createElement('script');\n", | |
" datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n", | |
" datatableScript.type = 'text/javascript';\n", | |
" datatableScript.onload = function() {\n", | |
" window.interactive_beam_jquery = jQuery.noConflict(true);\n", | |
" window.interactive_beam_jquery(document).ready(function($){\n", | |
" \n", | |
" $(\"#progress_indicator_b33df83a230e72a76d738872b71da9ed\").remove();\n", | |
" });\n", | |
" }\n", | |
" document.head.appendChild(datatableScript);\n", | |
" };\n", | |
" document.head.appendChild(jqueryScript);\n", | |
" } else {\n", | |
" window.interactive_beam_jquery(document).ready(function($){\n", | |
" \n", | |
" $(\"#progress_indicator_b33df83a230e72a76d738872b71da9ed\").remove();\n", | |
" });\n", | |
" }" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"ib.show_graph(p)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"id": "a70e158a", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/Users/rpa/Library/Jupyter/runtime/kernel-c469c754-043a-4000-8dee-856fdcc97ae6.json']\n", | |
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Index({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}) Frozen({'time': 2, 'lat': 18, 'lon': 36}) time 10\n" | |
] | |
} | |
], | |
"source": [ | |
"result = p.run()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"id": "4ef5fb31", | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.9.9" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment