|
from dagster import op, job, DynamicOutput, Config, OpExecutionContext, DynamicOut, Definitions |
|
from dagster_k8s import execute_k8s_job |
|
from pydantic import Field |
|
from typing import List |
|
import random |
|
import string |
|
|
|
def split_raw_data_into_partitions(context: OpExecutionContext, path: str, batch_size: int) -> List[str]: |
|
""" Function that calls an external system to stage raw data into a series of partitions """ |
|
context.log.info(f"Splitting {path} into {batch_size}") |
|
return [random.choice(string.ascii_letters) for i in range(batch_size)] |
|
|
|
class ProcessingConfig(Config): |
|
batch_size: int = Field(10, description="Number of batches for parallel processing") |
|
root_data_path: str = Field(description="Path to raw data to process") |
|
|
|
@op(out=DynamicOut()) |
|
def prepare_partitions(context: OpExecutionContext, config: ProcessingConfig): |
|
""" Given raw data and batch size, make partitions """ |
|
partition_keys = split_raw_data_into_partitions(context, config.root_data_path, config.batch_size) |
|
for key in partition_keys: |
|
yield DynamicOutput(key, mapping_key=key) |
|
|
|
@op |
|
def first_compute_piece(context: OpExecutionContext, partiton_key: str): |
|
""" Run the first piece of processing for a partition using image x""" |
|
context.log.info(f"Launching first container to process {partiton_key}") |
|
|
|
if False: |
|
execute_k8s_job( |
|
image="busybox", |
|
command=["/bin/sh", "-c"], |
|
args=[f"echo {partiton_key}"], |
|
) |
|
|
|
return partiton_key |
|
|
|
@op |
|
def second_compute_piece(context: OpExecutionContext, partiton_key: str): |
|
""" Run the second piece of processing for a partition using image x""" |
|
context.log.info(f"Launching second container to process {partiton_key}") |
|
|
|
if False: |
|
execute_k8s_job( |
|
image="busybox", |
|
command=["/bin/sh", "-c"], |
|
args=[f"echo {partiton_key}"], |
|
) |
|
|
|
return partiton_key |
|
|
|
@op |
|
def merge_and_analyze(context: OpExecutionContext, partitions: List[str]): |
|
""" Do some final processing that needs the result of all the partitions""" |
|
context.log.info(f"Finished processing the partitions: {partitions}") |
|
return partitions |
|
|
|
@job |
|
def run_pipeline_job(): |
|
pieces = prepare_partitions() |
|
results = pieces.map(first_compute_piece).map(second_compute_piece) |
|
merge_and_analyze(results.collect()) |
|
|
|
|
|
defs = Definitions( |
|
jobs=[run_pipeline_job] |
|
) |