Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Last active July 1, 2018 17:48
Show Gist options
  • Save mrocklin/3bffa7770245c2e52cd1 to your computer and use it in GitHub Desktop.
Save mrocklin/3bffa7770245c2e52cd1 to your computer and use it in GitHub Desktop.
Example using dask.dataframe together with distributed

We set up a distributed network, create a lot of pandas dataframes on the workers, then create a single logical dask.dataframe out of their futures. We then do distributed dataframe filtering operations.

$ dscheduler
$ dworker 127.0.0.1:8786
$ dworker 127.0.0.1:8786
$ dworker 127.0.0.1:8786
In [1]: from distributed import Executor
In [2]: e = Executor('127.0.0.1:8786')

Here we create futures on distributed. Ideally this is done in the future by reading bytes from S3 and then calling read_foo for foo in {avro, parquet, csv, json, ...}

In [3]: import pandas as pd
In [4]: from pandas.util.testing import makeTimeDataFrame
In [5]: dfs = e.map(makeTimeDataFrame, range(100, 200)) 

Each element is a future pointing to a Pandas DataFrame

In [6]: dfs[5]
Out[6]: <Future: status: finished, key: makeTimeDataFrame-da698171c199311b287bdf50d1f2c9a7>
In [7]: dfs[5].result().head()
Out[7]: 
                   A         B         C         D
2000-01-03  0.447987  0.427651 -0.714716  0.624368
2000-01-04 -0.082903  0.158998 -0.330223 -0.491450
2000-01-05  0.488286 -1.027888  0.811868 -0.102420
2000-01-06 -0.965558  2.069203  0.016950  0.954196
2000-01-07 -0.050443  0.261907 -0.638817  0.117329

Wrap these many pandas.DataFrame Futures into one logical dask.dataframe

In [8]: from distributed.collections import futures_to_dask_dataframe
In [9]: df = futures_to_dask_dataframe(dfs)
In [10]: df
Out[10]: dd.DataFrame<distributed-pandas-to-dask-82d0849528189a75b417a9afb8350bf1, divisions=(None, None, None, ..., None, None)>

Now we can use dask as normal. It builds up dask graphs that parallelize across our futures. When we ask for the final result we specify the get= kwarg in order to tell dask to use the distributed scheduler.

In [11]: df.A.sum().compute(get=e.get)
Out[11]: 121.41476965260735

It's easy to forget the get= kwarg all the time, so we register it as a global default.

In [12]: import dask
In [13]: dask.set_options(get=e.get)  # global change
Out[13]: <dask.context.set_options at 0x7ff9d02bbf60>

Now we just use dask.dataframe normally and all computations are managed by distributed.

In [14]: df.A.sum().compute()
Out[14]: 121.41476965260735

In [15]: df2 = df[df.A > 0]
In [16]: df2.groupby(df2.A // 0.1).B.sum().compute()
Out[16]: 
A
0    -29.278603
1    -12.168106
2     11.839246
3    -27.266969
4     -4.472421
5     16.813963
6    -70.574865
7    -18.562929
8     16.888929
9    -23.554809
10     4.605609
11   -17.175290
12    10.343650
13    13.197777
14    15.115605
15   -22.019182
16   -23.618132
17    -0.288407
18     6.867232
19     8.371215
20    -6.357393
21     3.092484
22    -3.146922
23     2.362227
24    -7.194197
25     2.741383
26     5.675119
27    -0.851858
28     3.494459
29    -1.908012
30     0.133557
31    -1.312372
32    -2.733542
34    -0.272876
35    -0.200773
Name: B, dtype: float64
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment