Skip to content

Instantly share code, notes, and snippets.

@fionaRust
Last active January 28, 2021 08:32
Show Gist options
  • Save fionaRust/c7953629690a8be2e7477f266b113877 to your computer and use it in GitHub Desktop.
Save fionaRust/c7953629690a8be2e7477f266b113877 to your computer and use it in GitHub Desktop.
Notes from investigation of Dask. Improver issue #143

Dask Notes

Introduction

Dask is a flexible parallel computing library for analytic computing. Dask is composed of two components:

  1. Dynamic task scheduling optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.
  2. “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of the dynamic task schedulers.

Collections in Dask

Dask DataFrame mimics Pandas – I’ve not paid too much attention to this, as I haven’t used pandas before. But it does support hdf5 data and may have some useful functions in it. Also good for csv data.

import dask.dataframe as dd

Dask Array mimics numpy – I’ve paid most attention to this part of Dask.

import dask.array as da

Dask Bag mimics iterators, also used for semi-structured data like JSON; I’ve paid less attention to this as I don’t think we will be using this type of data much.

import dask.bag as db

Dask Delayed mimics for loops and wraps custom code

from Dask import delayed

Dask Futures supports the use of Future objects to carry out tasks through a client. Similar to Delayed but immediate rather than lazy.

Graphs

Dask works by creating graphs (like cylc graphs) for the different tasks needed in order to do a calculation. The various collections (like Array, Bag and DataFrame) hide the creation of graphs for you. These are then processed by a scheduler. There is a nice graphic of this on the Dask tutorial, along with some more detailed descriptions: https://github.com/dask/dask-tutorial/blob/master/02_foundations.ipynb

Schedulers

Two main types, for single machine and clusters. There are options to use multithreading and multiprocessing.

Inspecting Dask objects

Can use .dask on a Dask object to see the graph (this doesn’t seem to print very well but it works like a dictionary so you can use .keys(), .values() etc.). If we had graphviz tools installed, you could do import dot_graph from dask.dot to plot Dask graphs.

There are various tools for debugging and profiling, see here: http://dask.pydata.org/en/latest/diagnostics.html

Dask Array

This is one of the main modules we could consider using. Compatible with HDF5, you can use to do delay loading of arrays that are too big to fit in memory by stacking them – it creates a Dask array containing tasks telling it how to load the data. You can also create your own dask array using numpy like array creation funtions.

Dask Array design

Main useful page of documentation is this: http://dask.pydata.org/en/latest/array-design.html

It explains the inner workings of Dask Arrays. They are basically many numpy arrays which form chunks of a bigger array. These form part of the Dask graph. See here: http://dask.pydata.org/en/latest/array-overview.html#design

Components of a Dask Array

  1. A Dask graph with a special set of keys designating blocks such as ('x', 0, 0), ('x', 0, 1), ... (See Dask graph documentation for more details.)
  2. A sequence of chunk sizes along each dimension called chunks, for example ((5, 5, 5, 5), (8, 8, 8))
  3. A name to identify which keys in the Dask graph refer to this array, like 'x'
  4. A NumPy dtype For Example
>>> import dask.array as da
>>> x = da.arange(0, 15, chunks=(5,))

>>> x.name
'arange-539766a'

>>> x.dask  # somewhat simplified
{('arange-539766a', 0): (np.arange, 0, 5),
 ('arange-539766a', 1): (np.arange, 5, 10),
 ('arange-539766a', 2): (np.arange, 10, 15)}

>>> x.chunks
((5, 5, 5),)

>>> x.dtype
dtype('int64')

The size of the chunk should be thought about. They need to fit into memory but not be too small, and reflect the processing that will be carried out on the array. More guidance available here: http://dask.pydata.org/en/latest/array-design.html#chunks and here http://dask.pydata.org/en/latest/array-creation.html#chunks

Basic overview of functionality.

You can write functions to return Dask arrays. See this example: http://dask.pydata.org/en/latest/array-design.html#example-eye-function The graph is executed when you call DaskArray.compute()

  • The Dask Array API (http://dask.pydata.org/en/latest/array-api.html) contains a lot of the numpy core interface, mostly we can replace our calls to numpy routines with calls to the Dask equivalents to delay execution.
  • Uses threaded scheduler by default to avoid data transfer cost.
  • Has some limitations, none of which I see as being of immediate concern, see here: http://dask.pydata.org/en/latest/array-overview.html#limitations
  • Can create Dask arrays from other Delayed objects using da.from_delayed, not sure if we will use this, maybe for merging arrays back together, although there are specific concatenate and stack functions too.
  • Automatically makes Dask arrays from numpy arrays when using a Dask Array function on a numpy array. Will match chunking if adding for example a numpy array to a Dask array.
  • Can save in hdf5 or any other format, using da.to_hdf5 or da.store. (see Iris2 section).
  • Does efficient calculation if only a portion of the array is needed for the calculation.
  • Use da.stack and da.concatenate to merge arrays
  • It has the ability to use overlapping blocks, this is called ghosting. This is useful when you need small sections of overlap between your chunks within your Dask array in order to do efficient algorithms. This could be useful for neighbourhood algorithms and things like gradients. I haven’t tried this functionality out yet. See diagrams here http://dask.pydata.org/en/latest/array-ghost.html#ghosting and options for how to control this further down this page: http://dask.pydata.org/en/latest/array-ghost.html
  • Some support for sparse arrays, SciPy Linear Operator and scipy.stats.

Dask Bag

The most useful overview of this is here: https://github.com/dask/dask-tutorial/blob/master/03_bag.ipynb

  • It is generally used for semi-structured data. You create a bag which is like a set. It can also be used for iterators.
  • You can convert your bag to a DataFrame to apply Pandas type computations
  • You can map arbitrary functions to a bag, then compute the results
  • Generally slower than Array or DataFrame.

Delayed

Used to created Delayed objects for delayed execution. Used to wrap normal python functions and build up processing, calling .compute on the final delayed object at the end of the chain to generate the final result. Basically allows some parallelization by wrapping normal python. It is the most general part of Dask, used for customising algorithms to use Dask scheduling, beyond the functions that are available in Dask Array, Dataframes etc.

Two different syntaxes available. You use .compute on the end result for either syntax:

  • delayed(function)(arguments) e.g. delayed(sum)(x) where x is a delayed object.
  • @delayed decorator e.g.
@delayed
def my_func(data):
... 

Most operations can be used, each creating a new Delayed. Some operations can’t be used eg, a+=1, a[0]=1, a.foo=1, if a:, for i in a: Delayed objects can’t be used in if statements and they can’t be iterated over.

For things like random number generation, in order to create different results each time you need to add pure=False keyword argument to Delayed. Otherwise when using pure=True dask can reuse keys in the dictionary to give the same result.

It can be used to make objects lazy. Can be used with things like DataFrames for custom data formats to create delayed objects etc.

More detailed API here: http://dask.pydata.org/en/latest/delayed-api.html

Scheduling

These schedulers are available:

  • dask.threaded.get: a scheduler backed by a thread pool
  • dask.multiprocessing.get: a scheduler backed by a process pool
  • dask.get: a synchronous scheduler, good for debugging
  • distributed.Client.get: a distributed scheduler for executing graphs on multiple machines. This lives in the external distributed project.

The get function works on keys in a Dask graph. When working with Dask collections (Array, Bag etc) you normally use .compute instead. Each collection has its own default scheduler:

  • dask.array and dask.dataframe use the threaded scheduler by default
  • dask.bag uses the multiprocessing scheduler by default.

You can also use top-level compute function from each collection to compute multiple results, sharing intermediate results.

You can modify the default schedulers, see here: http://dask.pydata.org/en/latest/scheduler-overview.html#configuring-the-schedulers and use specific schedulers for debugging.

Futures

This supports submitting work to a client for arbitrary task scheduling, but is immediate rather than lazy. Not sure we really want to do this, the default behaviour and the information in Schedulers section in the introduction may be more useful. Steps are:

  1. from dask.distributed import Client
  2. Start a client by client = Client() or client = Client(processes=False) to control whether multiprocessing or multithreading is used (or use .map if you want to call the same thing on many inputs). You can submit things that depend on things already submitted.
  3. Submit tasks to client using a_future = client.submit(my_fuct, args). This returns a future which refers to the remote result. It also has a status which can show whether it has completed yet or not.
  4. Ask for the result back explicitly using a_future.result. This blocks until the result is ready. You can also use Client.gather(futures) to get multiple futures from memory, which may be quicker.

You can send data to the Client by submitting it normally (as an arguement to a processing function) or you can use client.scatter(data), which may be quicker.

There is an overhead of about 1ms so for large numbers of inputs you are better off using Bags or Dataframe.

Futures are garbage collected but you can also future.cancel()

Dask raises exceptions and tracebacks from the remote if something has gone wrong. These are visible when you try to access the result. For more efficient use you can loop over things as they complete, (results=True gathers results in the background)

for future in as_completed(futures, results=True):
   ...

Use fire_and_forget(future), where for example future is a submission of a writing to a file task. (You don’t care about the result, it won’t be returned.)

Tasks can submit their own tasks by accessing their own client, but you need to be careful not to overload.

API here: http://dask.pydata.org/en/latest/futures.html#api

Dask Distributed

This has its own separate documentation (here: https://distributed.readthedocs.io/en/latest/ ) but it is really another scheduler for using multiple machines or nodes. It basically extends Futures and Dask to clusters. You can start your own clients using the command line (or in the python session) and then submit work to them or use Delayed_object.compute() as normal. There is also a web interface to see what is going on. This notebook gives the best example: https://github.com/dask/dask-tutorial/blob/master/04_distributed.ipynb

Advanced use gives control over submitting work to the client and gathering results as in the Futures section below. There’s some examples of this and some ideas for debugging in this notebook https://github.com/dask/dask-tutorial/blob/master/06_distributed_advanced.ipynb

Note dask-ssh (a way of setting up clients over multiple machines) does not work on desktop as one of its pre-requisites is not available.

It should be possible to use with job schedulers like SLURM etc, but I haven’t looked into this properly yet. More technical details are available here: http://dask.pydata.org/en/latest/distributed.html

Iris2 and Dask

Iris 2 latest documentation is available here: https://scitools-docs.github.io/iris/master/index.html

Main changes outlined here: https://scitools-docs.github.io/iris/master/whatsnew/2.0a0.html and discussion of lazy data here: https://scitools-docs.github.io/iris/master/userguide/real_and_lazy_data.html

The main points are:

  • Uses Dask rather than bigus for lazy data, seen by calling cube.core_data(), so Iris cubes will contain Dask arrays upon load.
  • Doesn’t support masked data, only NaNs
  • Integer data is calculated as floats and then recast back to integers, which may affect some calculations.
  • Don’t try and save data to the source file, you will lose data.
  • Uses da.store to save the data within iris, so the data can still be lazy upon save making best use of Dask and not returning values unnecessarily, instead saving results straight to disk.

Other Uses

Dask can be used for machine learning and with other distributed systems, I’ve not looked at this: http://dask.pydata.org/en/latest/machine-learning.html

Useful links

Dask Cheat Sheet: http://dask.pydata.org/en/latest/cheatsheet.html

Dask Array API: http://dask.pydata.org/en/latest/array-api.html

Dask Delayed API: http://dask.pydata.org/en/latest/delayed-api.html

Dask Futures API: http://dask.pydata.org/en/latest/futures.html#api

Dask diagnostics: http://dask.pydata.org/en/latest/diagnostics.html

Latest Iris documentation: https://scitools-docs.github.io/iris/master/index.html

Dask tutorial: https://github.com/dask/dask-tutorial

Dask Distributed: https://distributed.readthedocs.io/en/latest/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment