We set up a distributed network, create a lot of pandas dataframes on the workers, then create a single logical dask.dataframe out of their futures. We then do distributed dataframe filtering operations.
$ dscheduler
$ dworker 127.0.0.1:8786
$ dworker 127.0.0.1:8786
$ dworker 127.0.0.1:8786
In [1]: from distributed import Executor
In [2]: e = Executor('127.0.0.1:8786')
Here we create futures on distributed. Ideally this is done in the future by reading bytes from S3 and then calling read_foo for foo in {avro, parquet, csv, json, ...}
In [3]: import pandas as pd
In [4]: from pandas.util.testing import makeTimeDataFrame
In [5]: dfs = e.map(makeTimeDataFrame, range(100, 200))
Each element is a future pointing to a Pandas DataFrame
In [6]: dfs[5]
Out[6]: <Future: status: finished, key: makeTimeDataFrame-da698171c199311b287bdf50d1f2c9a7>
In [7]: dfs[5].result().head()
Out[7]:
A B C D
2000-01-03 0.447987 0.427651 -0.714716 0.624368
2000-01-04 -0.082903 0.158998 -0.330223 -0.491450
2000-01-05 0.488286 -1.027888 0.811868 -0.102420
2000-01-06 -0.965558 2.069203 0.016950 0.954196
2000-01-07 -0.050443 0.261907 -0.638817 0.117329
Wrap these many pandas.DataFrame Futures into one logical dask.dataframe
In [8]: from distributed.collections import futures_to_dask_dataframe
In [9]: df = futures_to_dask_dataframe(dfs)
In [10]: df
Out[10]: dd.DataFrame<distributed-pandas-to-dask-82d0849528189a75b417a9afb8350bf1, divisions=(None, None, None, ..., None, None)>
Now we can use dask as normal. It builds up dask graphs that parallelize across our futures. When we ask for the final result we specify the get=
kwarg in order to tell dask to use the distributed scheduler.
In [11]: df.A.sum().compute(get=e.get)
Out[11]: 121.41476965260735
It's easy to forget the get=
kwarg all the time, so we register it as a global default.
In [12]: import dask
In [13]: dask.set_options(get=e.get) # global change
Out[13]: <dask.context.set_options at 0x7ff9d02bbf60>
Now we just use dask.dataframe normally and all computations are managed by distributed.
In [14]: df.A.sum().compute()
Out[14]: 121.41476965260735
In [15]: df2 = df[df.A > 0]
In [16]: df2.groupby(df2.A // 0.1).B.sum().compute()
Out[16]:
A
0 -29.278603
1 -12.168106
2 11.839246
3 -27.266969
4 -4.472421
5 16.813963
6 -70.574865
7 -18.562929
8 16.888929
9 -23.554809
10 4.605609
11 -17.175290
12 10.343650
13 13.197777
14 15.115605
15 -22.019182
16 -23.618132
17 -0.288407
18 6.867232
19 8.371215
20 -6.357393
21 3.092484
22 -3.146922
23 2.362227
24 -7.194197
25 2.741383
26 5.675119
27 -0.851858
28 3.494459
29 -1.908012
30 0.133557
31 -1.312372
32 -2.733542
34 -0.272876
35 -0.200773
Name: B, dtype: float64