Skip to content

Instantly share code, notes, and snippets.

@jcrist
Created February 5, 2021 20:46
Show Gist options
  • Save jcrist/2213c2ad2740db305922e3f879330a69 to your computer and use it in GitHub Desktop.
Save jcrist/2213c2ad2740db305922e3f879330a69 to your computer and use it in GitHub Desktop.
An example of using Dask with a Prefect `resource_manager`
"""
For workloads where most of the grunt work is *driven* by prefect, but done
using some external system like dask, it makes more sense to use Prefect to
drive Dask rather than running Prefect inside Dask.
If you want your prefect Flow to startup a dask cluster, you'll want to ensure
all resources are still cleaned up properly, even in the case of Flow failure.
To do this, you can make use of a `prefect.resource_manager`. This mirrors the
`contextmanager` pattern you may be familiar with in Python, but makes it work
with Prefect tasks. See
https://docs.prefect.io/core/idioms/resource-manager.html for more information.
Below we create a resource manager for spinning up a Dask cluster, and then use
that to drive some dask tasks as part of a larger flow. You should be able to
apply this same pattern to your existing flows.
"""
import dask
from prefect import Flow, Parameter, task, resource_manager
from dask.distributed import Client
@resource_manager
class DaskCluster:
"""Create a temporary dask cluster.
Args:
- cluster_kwargs (dict): options to forward to the backing Dask cluster manager
"""
def __init__(self, cluster_kwargs=None):
self.cluster_kwargs = cluster_kwargs or {}
def setup(self):
"""Create a temporary dask cluster, returning the `Client`"""
# Here we use a LocalCluster, but you could just as well use any other dask cluster manager.
from dask.distributed import LocalCluster
cluster = LocalCluster(**self.cluster_kwargs)
# Another option would be to use dask-kubernetes.
# See https://kubernetes.dask.org/en/latest/kubecluster.html for more info.
#
# from dask_kubernetes import KubeCluster
# cluster = KubeCluster() # you'll likely want some config specific to your deployment here
# cluster.adapt()
return Client(cluster)
def cleanup(self, client):
"""Shutdown the temporary dask cluster"""
cluster = client.cluster
# Ignore any errors shutting down the client, they're innocuous
try:
client.close()
except Exception:
pass
# Shutdown the cluster. An error here will be marked by prefect and
# show up in the UI (if using Cloud/server), but won't fail the flow.
# It will let you observe the failure, so if it's something you want to
# investigate you can.
cluster.close()
@task
def load_data():
"""Load some data"""
return dask.datasets.timeseries()
@task
def summarize(df):
"""Transform the data. This automatically uses the dask cluster created above"""
return df.describe().compute()
@task
def write_csv(df, path):
"""Write the data to disk. This task doesn't use dask at all."""
return df.to_csv(path, index_label="index")
with Flow("example") as flow:
out_path = Parameter("out_path", default="example.csv")
with DaskCluster() as client:
df = load_data()
# This task makes use of the existing dask cluster. Dask will
# automatically use the client created above, so you don't need to pass
# in the `client` argument unless you want to reference it directly.
df2 = summarize(df)
# On exiting from the above block, the dask cluster will be shutdown.
# This task doesn't rely on the dask cluster to run, so it doesn't need to
# be under the `DaskCluster` context
write_csv(df2, out_path)
if __name__ == "__main__":
flow.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment