Dask is a flexible parallel computing library for analytic computing. Dask is composed of two components:
- Dynamic task scheduling optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.
- “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of the dynamic task schedulers.
• Dask DataFrame mimics Pandas – I’ve not paid too much attention to this, as I haven’t used pandas before. But it does support hdf5 data and may have some useful functions in it. Also good for csv data.
import dask.dataframe as dd
• Dask Array mimics numpy – I’ve paid most attention to this part of Dask.
import dask.array as da
• Dask Bag mimics iterators, also used for semi-structured data like JSON; I’ve paid less attention to this as I don’t think we will be using this type of data much.
import dask.bag as db
• Dask Delayed mimics for
loops and wraps custom code
from Dask import delayed
• Dask Futures supports the use of Future objects to carry out tasks through a client. Similar to Delayed but immediate rather than lazy.
Dask works by creating graphs (like cylc graphs) for the different tasks needed in order to do a calculation. The various collections (like Array, Bag and DataFrame) hide the creation of graphs for you. These are then processed by a scheduler. There is a nice graphic of this on the Dask tutorial, along with some more detailed descriptions: https://github.com/dask/dask-tutorial/blob/master/02_foundations.ipynb
Two main types, for single machine and clusters. There are options to use multithreading and multiprocessing.
Can use .dask
on a Dask object to see the graph (this doesn’t seem to print very well but it works like a dictionary so you can use .keys()
, .values()
etc.). If we had graphviz tools installed, you could do import dot_graph from dask.dot
to plot Dask graphs.
There are various tools for debugging and profiling, see here: http://dask.pydata.org/en/latest/diagnostics.html
This is one of the main modules we could consider using. Compatible with HDF5, you can use to do delay loading of arrays that are too big to fit in memory by stacking them – it creates a Dask array containing tasks telling it how to load the data. You can also create your own dask array using numpy like array creation funtions.
Main useful page of documentation is this: http://dask.pydata.org/en/latest/array-design.html
It explains the inner workings of Dask Arrays. They are basically many numpy arrays which form chunks of a bigger array. These form part of the Dask graph. See here: http://dask.pydata.org/en/latest/array-overview.html#design
- A Dask graph with a special set of keys designating blocks such as ('x', 0, 0), ('x', 0, 1), ... (See Dask graph documentation for more details.)
- A sequence of chunk sizes along each dimension called chunks, for example ((5, 5, 5, 5), (8, 8, 8))
- A name to identify which keys in the Dask graph refer to this array, like 'x'
- A NumPy dtype For Example
>>> import dask.array as da
>>> x = da.arange(0, 15, chunks=(5,))
>>> x.name
'arange-539766a'
>>> x.dask # somewhat simplified
{('arange-539766a', 0): (np.arange, 0, 5),
('arange-539766a', 1): (np.arange, 5, 10),
('arange-539766a', 2): (np.arange, 10, 15)}
>>> x.chunks
((5, 5, 5),)
>>> x.dtype
dtype('int64')
The size of the chunk should be thought about. They need to fit into memory but not be too small, and reflect the processing that will be carried out on the array. More guidance available here: http://dask.pydata.org/en/latest/array-design.html#chunks and here http://dask.pydata.org/en/latest/array-creation.html#chunks
You can write functions to return Dask arrays. See this example: http://dask.pydata.org/en/latest/array-design.html#example-eye-function The graph is executed when you call DaskArray.compute()
- The Dask Array API (http://dask.pydata.org/en/latest/array-api.html) contains a lot of the numpy core interface, mostly we can replace our calls to numpy routines with calls to the Dask equivalents to delay execution.
- Uses threaded scheduler by default to avoid data transfer cost.
- Has some limitations, none of which I see as being of immediate concern, see here: http://dask.pydata.org/en/latest/array-overview.html#limitations
- Can create Dask arrays from other Delayed objects using
da.from_delayed
, not sure if we will use this, maybe for merging arrays back together, although there are specific concatenate and stack functions too. - Automatically makes Dask arrays from numpy arrays when using a Dask Array function on a numpy array. Will match chunking if adding for example a numpy array to a Dask array.
- Can save in hdf5 or any other format, using
da.to_hdf5
orda.store
. (see Iris2 section). - Does efficient calculation if only a portion of the array is needed for the calculation.
- Use
da.stack
andda.concatenate
to merge arrays - It has the ability to use overlapping blocks, this is called ghosting. This is useful when you need small sections of overlap between your chunks within your Dask array in order to do efficient algorithms. This could be useful for neighbourhood algorithms and things like gradients. I haven’t tried this functionality out yet. See diagrams here http://dask.pydata.org/en/latest/array-ghost.html#ghosting and options for how to control this further down this page: http://dask.pydata.org/en/latest/array-ghost.html
- Some support for sparse arrays, SciPy Linear Operator and scipy.stats.
The most useful overview of this is here: https://github.com/dask/dask-tutorial/blob/master/03_bag.ipynb
- It is generally used for semi-structured data. You create a bag which is like a set. It can also be used for iterators.
- You can convert your bag to a DataFrame to apply Pandas type computations
- You can map arbitrary functions to a bag, then compute the results
- Generally slower than Array or DataFrame.
Used to created Delayed objects for delayed execution. Used to wrap normal python functions and build up processing, calling .compute
on the final delayed object at the end of the chain to generate the final result. Basically allows some parallelization by wrapping normal python. It is the most general part of Dask, used for customising algorithms to use Dask scheduling, beyond the functions that are available in Dask Array, Dataframes etc.
Two different syntaxes available. You use .compute on the end result for either syntax:
delayed(function)(arguments)
e.g.delayed(sum)(x)
wherex
is a delayed object.@delayed decorator
e.g.
@delayed
def my_func(data):
...
Most operations can be used, each creating a new Delayed. Some operations can’t be used eg, a+=1
, a[0]=1
, a.foo=1
, if a:
, for i in a:
Delayed objects can’t be used in if statements and they can’t be iterated over.
For things like random number generation, in order to create different results each time you need to add pure=False keyword argument to Delayed. Otherwise when using pure=True dask can reuse keys in the dictionary to give the same result.
It can be used to make objects lazy. Can be used with things like DataFrames for custom data formats to create delayed objects etc.
More detailed API here: http://dask.pydata.org/en/latest/delayed-api.html
These schedulers are available:
dask.threaded.get
: a scheduler backed by a thread pooldask.multiprocessing.get
: a scheduler backed by a process pooldask.get
: a synchronous scheduler, good for debuggingdistributed.Client.get
: a distributed scheduler for executing graphs on multiple machines. This lives in the external distributed project.
The get
function works on keys in a Dask graph. When working with Dask collections (Array, Bag etc) you normally use .compute
instead. Each collection has its own default scheduler:
dask.array
anddask.dataframe
use the threaded scheduler by defaultdask.bag
uses the multiprocessing scheduler by default.
You can also use top-level compute
function from each collection to compute multiple results, sharing intermediate results.
You can modify the default schedulers, see here: http://dask.pydata.org/en/latest/scheduler-overview.html#configuring-the-schedulers and use specific schedulers for debugging.
This supports submitting work to a client for arbitrary task scheduling, but is immediate rather than lazy. Not sure we really want to do this, the default behaviour and the information in Schedulers section in the introduction may be more useful. Steps are:
from dask.distributed import Client
- Start a client by
client = Client()
orclient = Client(processes=False)
to control whether multiprocessing or multithreading is used (or use.map
if you want to call the same thing on many inputs). You can submit things that depend on things already submitted. - Submit tasks to client using
a_future = client.submit(my_fuct, args)
. This returns a future which refers to the remote result. It also has a status which can show whether it has completed yet or not. - Ask for the result back explicitly using
a_future.result
. This blocks until the result is ready. You can also useClient.gather(futures)
to get multiple futures from memory, which may be quicker.
You can send data to the Client by submitting it normally (as an arguement to a processing function) or you can use client.scatter(data)
, which may be quicker.
There is an overhead of about 1ms so for large numbers of inputs you are better off using Bags or Dataframe.
Futures are garbage collected but you can also future.cancel()
Dask raises exceptions and tracebacks from the remote if something has gone wrong. These are visible when you try to access the result.
For more efficient use you can loop over things as they complete, (results=True
gathers results in the background)
for future in as_completed(futures, results=True):
...
Use fire_and_forget(future)
, where for example future
is a submission of a writing to a file task. (You don’t care about the result, it won’t be returned.)
Tasks can submit their own tasks by accessing their own client, but you need to be careful not to overload.
API here: http://dask.pydata.org/en/latest/futures.html#api
This has its own separate documentation (here: https://distributed.readthedocs.io/en/latest/ ) but it is really another scheduler for using multiple machines or nodes. It basically extends Futures and Dask to clusters. You can start your own clients using the command line (or in the python session) and then submit work to them or use Delayed_object.compute() as normal. There is also a web interface to see what is going on. This notebook gives the best example: https://github.com/dask/dask-tutorial/blob/master/04_distributed.ipynb
Advanced use gives control over submitting work to the client and gathering results as in the Futures section below. There’s some examples of this and some ideas for debugging in this notebook https://github.com/dask/dask-tutorial/blob/master/06_distributed_advanced.ipynb
Note dask-ssh
(a way of setting up clients over multiple machines) does not work on desktop as one of its pre-requisites is not available.
It should be possible to use with job schedulers like SLURM etc, but I haven’t looked into this properly yet. More technical details are available here: http://dask.pydata.org/en/latest/distributed.html
Iris 2 latest documentation is available here: https://scitools-docs.github.io/iris/master/index.html
Main changes outlined here: https://scitools-docs.github.io/iris/master/whatsnew/2.0a0.html and discussion of lazy data here: https://scitools-docs.github.io/iris/master/userguide/real_and_lazy_data.html
The main points are:
- Uses Dask rather than bigus for lazy data, seen by calling
cube.core_data()
, so Iris cubes will contain Dask arrays upon load. - Doesn’t support masked data, only NaNs
- Integer data is calculated as floats and then recast back to integers, which may affect some calculations.
- Don’t try and save data to the source file, you will lose data.
- Uses
da.store
to save the data within iris, so the data can still be lazy upon save making best use of Dask and not returning values unnecessarily, instead saving results straight to disk.
Dask can be used for machine learning and with other distributed systems, I’ve not looked at this: http://dask.pydata.org/en/latest/machine-learning.html
Dask Cheat Sheet: http://dask.pydata.org/en/latest/cheatsheet.html
Dask Array API: http://dask.pydata.org/en/latest/array-api.html
Dask Delayed API: http://dask.pydata.org/en/latest/delayed-api.html
Dask Futures API: http://dask.pydata.org/en/latest/futures.html#api
Dask diagnostics: http://dask.pydata.org/en/latest/diagnostics.html
Latest Iris documentation: https://scitools-docs.github.io/iris/master/index.html
Dask tutorial: https://github.com/dask/dask-tutorial
Dask Distributed: https://distributed.readthedocs.io/en/latest/