Skip to content

Instantly share code, notes, and snippets.

@haijohn
Created June 12, 2025 08:45
Show Gist options
  • Save haijohn/32bc8df7f7b2f68d84a7c863e543baa4 to your computer and use it in GitHub Desktop.
Save haijohn/32bc8df7f7b2f68d84a7c863e543baa4 to your computer and use it in GitHub Desktop.
workflow design
Software Architecture Document: Metadata-Driven Workflow Engine
Version: 1.0
Date: June 12, 2025
Author: Gemini Architect
1. Motivation
The current system for executing data and machine learning pipelines involves multiple distinct workflows, each with its own dedicated codebase, repository, and CI/CD pipeline. While this approach allows for isolation, it has led to significant inefficiencies and operational challenges:
High Maintenance Overhead: Managing numerous repositories and deployment pipelines is time-consuming. A change to a common dependency or a bug fix in a shared logic pattern must be manually replicated across multiple codebases.
Code Duplication: Common tasks such as data loading, authentication, and logging are frequently rewritten or copied, leading to inconsistent implementations and a greater surface area for bugs.
Slow Development Velocity: Creating a new workflow requires substantial boilerplate code, repository setup, and CI/CD configuration, hindering the team's ability to rapidly iterate and deploy new data products.
Lack of Standardization: Without a centralized framework, workflows can diverge in quality, logging practices, and error handling, making them difficult to monitor and debug systematically.
This architecture aims to replace the fragmented system with a centralized, flexible, and efficient engine that promotes code reuse and accelerates workflow development.
2. Goals
This architectural design seeks to achieve the following primary goals:
Centralize Logic: Consolidate all workflow components into a single, well-structured codebase.
Promote Reusability: Develop a library of generic, configurable, and independently testable components (e.g., S3DataLoader, SQLRunner, ModelPredictor).
Be Metadata-Driven: Define the structure and parameterization of workflows in human-readable configuration files (YAML), completely decoupling the workflow logic from its execution.
Accelerate Development: Enable stakeholders to create and launch new workflows simply by writing a new metadata file, without writing any new engine code or setting up new deployment pipelines.
Ensure Consistency: Provide a single, robust execution engine that guarantees uniform logging, error handling, monitoring, and dependency management for all workflows.
3. Non-Goals
To maintain a clear focus and manageable scope for the initial implementation, the following are considered non-goals for Version 1.0:
A Graphical User Interface (GUI): The engine will be operated via a command-line interface (CLI) or an API. A workflow builder UI is a potential future enhancement.
Distributed Big Data Processing: The engine will be designed for single-node, in-memory execution. It is not intended to replace or compete with distributed computing frameworks like Apache Spark or Dask.
Real-time Stream Processing: The system is designed for batch or micro-batch workloads, not for ultra-low-latency, event-at-a-time stream processing.
Advanced Orchestration Features: Complex features like dynamic sub-workflows, conditional branching based on output, or human-in-the-loop approval steps are out of scope for the initial version. The focus is on executing a static Directed Acyclic Graph (DAG) of tasks.
4. Stakeholders
Stakeholder Role
Key Interest
Data Scientists / ML Engineers
(Primary Users) Rapidly define, test, and deploy new model and feature pipelines using simple metadata files.
Data Engineers
(Component Developers) Build, maintain, and test the library of robust and reusable workflow components.
Platform / DevOps Engineers
Deploy, monitor, and scale the central workflow engine. Ensure its reliability and uptime.
Project/Product Managers
Increase team velocity, reduce operational costs, and ensure consistent quality in data products.
5. Design Decisions
Decision 1: Centralized Engine with Component Registry
We will implement a central WorkflowEngine class responsible for orchestrating all workflows. This engine will use a Component Registry (a simple dictionary mapping) to dynamically discover and load the appropriate component class based on the component_name specified in the workflow metadata.
Justification: This design pattern decouples the engine from the components. New components can be added to the registry without any changes to the engine's core logic, making the system highly extensible.
Decision 2: In-Memory Data Exchange
Data will be passed between components in-memory. The engine will maintain an In-Memory Data Context (a Python dictionary) that holds the outputs of each executed task. A downstream task can reference these outputs by name as its inputs.
Justification: For workflows where the intermediate data artifacts fit comfortably in RAM, this approach is extremely fast. It avoids the performance overhead of serializing/deserializing data to disk or a network location between every step, which is ideal for the targeted use cases.
Decision 3: Standardized Component Interface
Every component will be a Python class that adheres to a strict interface. It must implement:
__init__(self, inputs, outputs, params): To receive its configuration.
execute(self): To perform its core logic and return a dictionary of its outputs.
Justification: A standardized interface is essential for the engine to treat all components polymorphically. It enforces a clean contract, simplifies testing, and makes components easy to reason about.
Decision 4: YAML for Workflow Definitions
Workflows will be defined in YAML files. YAML is chosen for its superior human readability, support for comments, and widespread adoption in the configuration-as-code ecosystem (e.g., Kubernetes, CI/CD).
Justification: YAML is less verbose and easier for non-developers to write and review compared to JSON or XML. This lowers the barrier to entry for Data Scientists and other stakeholders to define their own workflows.
6. Implementation Details
A. High-Level Architecture
+---------------------------------+
| Workflow Definition (YAML) |
| - name: "my_workflow" |
| - tasks: [...] |
+---------------------------------+
|
v
+---------------------------------+ +---------------------------+
| InMemoryWorkflowEngine |----->| Component Registry |
|---------------------------------| |---------------------------|
| - data_context: {} | | "load_data": LoadDataCls |
| - DAG resolver | | "transform": TransformCls |
| - Task executor | +---------------------------+
+---------------------------------+
|
v (Instantiates and executes components in order)
+---------------------------------+ +---------------------------+
| Active Component |----->| In-Memory Data Context |
|---------------------------------| |---------------------------|
| - __init__(...) | | "raw_df": <DataFrame> |
| - execute() |<-----| "features": <DataFrame> |
+---------------------------------+ +---------------------------+
B. Project Structure
/workflow_system
├── components/
│ ├── __init__.py
│ ├── data_loaders.py
│ └── feature_transformers.py
├── workflows/
│ ├── churn_prediction.yaml
│ └── sales_forecasting.yaml
├── engine.py # Contains the WorkflowEngine and ComponentRegistry
└── main.py # CLI entry point to run a workflow
C. Core Classes and Data Structures
1. Example Workflow Definition (churn_prediction.yaml)
workflow_name: "customer_churn_prediction_pipeline"
description: "End-to-end pipeline for predicting customer churn."
tasks:
- name: "load_customer_data"
component_name: "s3_csv_loader"
params:
bucket: "my-data-bucket"
key: "raw/customers.csv"
outputs:
- "raw_customer_df"
- name: "engineer_features"
component_name: "churn_feature_engineer"
# This task depends on the output of the previous task
inputs:
input_df: "raw_customer_df"
outputs:
- "feature_df"
- name: "run_prediction"
component_name: "sklearn_predictor"
inputs:
model_path: "s3://my-models/churn/latest.pkl"
data_to_predict: "feature_df"
outputs:
- "predictions"
- name: "save_results"
component_name: "database_writer"
inputs:
data_to_save: "predictions"
params:
table_name: "churn_results"
2. The Engine (engine.py)
# A simplified representation of the engine's logic
# Component Registry maps name from YAML to the actual class
COMPONENT_REGISTRY = { ... }
class InMemoryWorkflowEngine:
def __init__(self, workflow_path):
self.workflow_def = self._load_yaml(workflow_path)
self.data_context = {} # Stores all in-memory data
self.task_graph = self._build_dag(self.workflow_def['tasks'])
def run(self):
execution_order = self._topological_sort(self.task_graph)
for task_name in execution_order:
task_info = self._get_task_info(task_name)
# Resolve inputs from data_context
inputs = {k: self.data_context[v] for k, v in task_info.get('inputs', {}).items()}
# Instantiate and run
ComponentClass = COMPONENT_REGISTRY[task_info['component_name']]
component = ComponentClass(inputs, task_info['outputs'], task_info.get('params'))
outputs = component.execute()
# Store results back in context
self.data_context.update(outputs)
3. Example Component (data_loaders.py)
import pandas as pd
class S3CsvLoader:
def __init__(self, inputs, outputs, params):
# No inputs from other tasks for a loader
self.output_name = outputs[0]
self.bucket = params['bucket']
self.key = params['key']
def execute(self):
# In a real implementation, use boto3 to read from S3
path = f"s3://{self.bucket}/{self.key}"
print(f"Loading data from {path}...")
df = pd.read_csv(path)
return {self.output_name: df}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment