Skip to content

Instantly share code, notes, and snippets.

@pryce-turner
Last active July 26, 2024 12:16
Show Gist options
  • Save pryce-turner/0a67f86febdc812c9a2a9e739c22eeca to your computer and use it in GitHub Desktop.
Save pryce-turner/0a67f86febdc812c9a2a9e739c22eeca to your computer and use it in GitHub Desktop.
Processing Files with Dynamic Container Tasks in Flyte

Processing Files with Dynamic Container Tasks in Flyte

Motivation

I'm still very new to Flyte but it's quickly becoming one of my favorite tools. Between the modern architectural decisions and a welcoming and extremely helpful community, what's not to love?

Some of the abstractions however, do require a slight shift in assumptions compared to more localized workflow frameworks. Additionally, while the documentation is excellent at explaining Flyte's different features in isolation, there aren't as many posts on how they work together like there are for more established projects. This short piece is my attempt to capture my journey so far with a rather specific usecase that ties together a number of notable features. This is mostly to solidify my own understanding, but hopefully also a way to pay forward all the wonderful support I've received from the team.

Setup

  • Everything you need to run the code below is captured in the Environment Setup guide from the official docs
  • Upload some files to your local minio storage
    • open browser to http://localhost:30080/minio/browser
    • login with username minio and password miniostorage
    • make a path in my-s3-bucket called input-data
    • make some files with some content
      • echo hello > hello.txt
      • echo world > world.txt
    • upload both files to your new path
  • make a python file to contain our workflow e.g. filebased_wf.py and add the following imports to the top:
    import os
    from typing import List
    from flytekit import kwtypes, workflow, dynamic, task, ContainerTask
    from flytekit.types.file import FlyteFile
    from flytekit.types.directory import FlyteDirectory
    

Container Task

First we'll define a generic ContainerTask that will handle the actual processing of the files.

ct = ContainerTask(
    name="sed_task",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(infile=FlyteFile),
    outputs=kwtypes(out=FlyteFile),
    image="alpine",
    command=[
        "/bin/sh",
        "-c",
        "sed 's/o/a/g' /var/inputs/infile > /var/outputs/out",
    ],
)

It's a very simple task that uses sed to convert any occurrence of o in a file to a and output the result to a new file. A few things to note:

  • We're just using the ubiquitous alpine image here as these raw container tasks are designed to use any old container, managing the dataflow and therefore DAG building via typed inputs and outputs.
  • When using FlyteFile as the input and output, the keyword variable name actually becomes the filename inside the container filesystem. If you try to use variable substitution via {{.inputs.infile}} like you would with other types, you'd run into something like the following error. The full path in the object storage is passed in, which the container obviously can't interpret:
    sed: /var/inputs/s3://my-s3-bucket/data/82/f6da06ac4861f4dde89b-n1-0/b68e4787c8e70b0a6f64c04bada8b22a/hello.txt: No such file or directory
    
  • We're using the Shell Form here, but Exec Form would also work, albeit without shell redirection via >.

Directory Task

Next up we have a small but crucially important task that gets the directory we made earlier ready for processing in other tasks.

@task
def get_dir(dirpath: str) -> FlyteDirectory:
    fd = FlyteDirectory(path=dirpath)
    return fd

All this does is take a string representing the path to that directory, create, and return that directory as a proper FlyteDirectory. The reason this is so important is that FlyteFiles and FlyteDirectories get uploaded to and downloaded from the object store at the task boundary. What this means in practice is if you try to define a FlyteFile in a task body for example, and try opening it within that same context, the container running that task will have no idea how to open that file.

Dynamic Task

Since we may not know the content of our arbitrary directory at compile time, we need to rely on our handy Dynamic Task to process the inputs at run time.

@dynamic
def process_files(indir: FlyteDirectory) -> List[FlyteFile]:
    all_out = []
    for fname in os.listdir(indir):
        f = os.path.join(indir, fname)
        file_out = ct(infile=f)
        all_out.append(file_out)
    return all_out

This task will take the FlyteDirectory we instantiated earlier and loop through it's contents, passing each file to our Container Task. The FlyteFiles returned are then added to a list of processed files and returned.

Putting it all together

Finally we have the workflow definition that coordinates all these parts.

@workflow
def wf(indirpath: str) -> List[FlyteFile]:
    fdir = get_dir(dirpath=indirpath)
    outfiles = process_files(indir=fdir)
    return outfiles

All this does is take in a string from the commandline, create the FlyteDirectory, and pass it to the Dynamic Task. You can now invoke this workflow on your local sandbox cluster with: pyflyte run --remote filebased_wf.py wf --indirpath s3://my-s3-bucket/input-data. Head over to the Flyte Console at http://localhost:30080/console to track it's execution. Check out the inputs and outputs to the different tasks in the console and in minio. Go ahead and download one of those outputs to make sure our Container Task actually did what it's supposed to.

That's all there is to it! Please feel free to comment if anything is unclear or could be improved. I hope you found this little "hella warld" demo useful. Wishing you tailwinds and blue skies on your Flyte journey!

A Note on File Existence

I've now run into an issue twice that's been a bit of a conundrum. In one instance, I was packaging a FlyteFile as part of a custom dataclass and instantiation of that dataclass would fail, saying the file didn't exist. In another instance, I found it very strange that I needed to run os.listdir before I could run Path().rglob() on a directory. Flyte will handle certain file operations on FlyteFiles for you, allowing you to interact with them as if they were on your local system, for example open() and os.listdir. However, behind the scenes, Flyte is pulling those files into the pods local storage before calling those methods. By default, those assets won't exist locally, so anything beyond very basic operations will fail and leave you scratching your head. The simple solution is to call download() on the FlyteFile or FlyteDirectory before doing anything fancy to pull those in from object storage. If you're getting FileNotFound errors when it definitely should be there, check this first.

Full Code

Here's what your complete filebased_wf.py should look like:

import os
from typing import List
from flytekit import kwtypes, workflow, dynamic, task, ContainerTask
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory

ct = ContainerTask(
    name="sed_task",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(infile=FlyteFile),
    outputs=kwtypes(out=FlyteFile),
    image="alpine",
    command=[
        "/bin/sh",
        "-c",
        "sed 's/o/a/g' /var/inputs/infile > /var/outputs/out",
    ],
)

@task
def get_dir(dirpath: str) -> FlyteDirectory:
    fd = FlyteDirectory(path=dirpath)
    return fd

@dynamic
def process_files(indir: FlyteDirectory) -> List[FlyteFile]:
    all_out = []
    for fname in os.listdir(indir):
        f = os.path.join(indir, fname)
        file_out = ct(infile=f)
        all_out.append(file_out)
    return all_out

@workflow
def wf(indirpath: str) -> List[FlyteFile]:
    fdir = get_dir(dirpath=indirpath)
    outfiles = process_files(indir=fdir)
    return outfiles
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment