Created
February 5, 2021 20:46
-
-
Save jcrist/2213c2ad2740db305922e3f879330a69 to your computer and use it in GitHub Desktop.
An example of using Dask with a Prefect `resource_manager`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
For workloads where most of the grunt work is *driven* by prefect, but done | |
using some external system like dask, it makes more sense to use Prefect to | |
drive Dask rather than running Prefect inside Dask. | |
If you want your prefect Flow to startup a dask cluster, you'll want to ensure | |
all resources are still cleaned up properly, even in the case of Flow failure. | |
To do this, you can make use of a `prefect.resource_manager`. This mirrors the | |
`contextmanager` pattern you may be familiar with in Python, but makes it work | |
with Prefect tasks. See | |
https://docs.prefect.io/core/idioms/resource-manager.html for more information. | |
Below we create a resource manager for spinning up a Dask cluster, and then use | |
that to drive some dask tasks as part of a larger flow. You should be able to | |
apply this same pattern to your existing flows. | |
""" | |
import dask | |
from prefect import Flow, Parameter, task, resource_manager | |
from dask.distributed import Client | |
@resource_manager | |
class DaskCluster: | |
"""Create a temporary dask cluster. | |
Args: | |
- cluster_kwargs (dict): options to forward to the backing Dask cluster manager | |
""" | |
def __init__(self, cluster_kwargs=None): | |
self.cluster_kwargs = cluster_kwargs or {} | |
def setup(self): | |
"""Create a temporary dask cluster, returning the `Client`""" | |
# Here we use a LocalCluster, but you could just as well use any other dask cluster manager. | |
from dask.distributed import LocalCluster | |
cluster = LocalCluster(**self.cluster_kwargs) | |
# Another option would be to use dask-kubernetes. | |
# See https://kubernetes.dask.org/en/latest/kubecluster.html for more info. | |
# | |
# from dask_kubernetes import KubeCluster | |
# cluster = KubeCluster() # you'll likely want some config specific to your deployment here | |
# cluster.adapt() | |
return Client(cluster) | |
def cleanup(self, client): | |
"""Shutdown the temporary dask cluster""" | |
cluster = client.cluster | |
# Ignore any errors shutting down the client, they're innocuous | |
try: | |
client.close() | |
except Exception: | |
pass | |
# Shutdown the cluster. An error here will be marked by prefect and | |
# show up in the UI (if using Cloud/server), but won't fail the flow. | |
# It will let you observe the failure, so if it's something you want to | |
# investigate you can. | |
cluster.close() | |
@task | |
def load_data(): | |
"""Load some data""" | |
return dask.datasets.timeseries() | |
@task | |
def summarize(df): | |
"""Transform the data. This automatically uses the dask cluster created above""" | |
return df.describe().compute() | |
@task | |
def write_csv(df, path): | |
"""Write the data to disk. This task doesn't use dask at all.""" | |
return df.to_csv(path, index_label="index") | |
with Flow("example") as flow: | |
out_path = Parameter("out_path", default="example.csv") | |
with DaskCluster() as client: | |
df = load_data() | |
# This task makes use of the existing dask cluster. Dask will | |
# automatically use the client created above, so you don't need to pass | |
# in the `client` argument unless you want to reference it directly. | |
df2 = summarize(df) | |
# On exiting from the above block, the dask cluster will be shutdown. | |
# This task doesn't rely on the dask cluster to run, so it doesn't need to | |
# be under the `DaskCluster` context | |
write_csv(df2, out_path) | |
if __name__ == "__main__": | |
flow.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment