Created
June 12, 2025 08:45
-
-
Save haijohn/32bc8df7f7b2f68d84a7c863e543baa4 to your computer and use it in GitHub Desktop.
workflow design
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Software Architecture Document: Metadata-Driven Workflow Engine | |
Version: 1.0 | |
Date: June 12, 2025 | |
Author: Gemini Architect | |
1. Motivation | |
The current system for executing data and machine learning pipelines involves multiple distinct workflows, each with its own dedicated codebase, repository, and CI/CD pipeline. While this approach allows for isolation, it has led to significant inefficiencies and operational challenges: | |
High Maintenance Overhead: Managing numerous repositories and deployment pipelines is time-consuming. A change to a common dependency or a bug fix in a shared logic pattern must be manually replicated across multiple codebases. | |
Code Duplication: Common tasks such as data loading, authentication, and logging are frequently rewritten or copied, leading to inconsistent implementations and a greater surface area for bugs. | |
Slow Development Velocity: Creating a new workflow requires substantial boilerplate code, repository setup, and CI/CD configuration, hindering the team's ability to rapidly iterate and deploy new data products. | |
Lack of Standardization: Without a centralized framework, workflows can diverge in quality, logging practices, and error handling, making them difficult to monitor and debug systematically. | |
This architecture aims to replace the fragmented system with a centralized, flexible, and efficient engine that promotes code reuse and accelerates workflow development. | |
2. Goals | |
This architectural design seeks to achieve the following primary goals: | |
Centralize Logic: Consolidate all workflow components into a single, well-structured codebase. | |
Promote Reusability: Develop a library of generic, configurable, and independently testable components (e.g., S3DataLoader, SQLRunner, ModelPredictor). | |
Be Metadata-Driven: Define the structure and parameterization of workflows in human-readable configuration files (YAML), completely decoupling the workflow logic from its execution. | |
Accelerate Development: Enable stakeholders to create and launch new workflows simply by writing a new metadata file, without writing any new engine code or setting up new deployment pipelines. | |
Ensure Consistency: Provide a single, robust execution engine that guarantees uniform logging, error handling, monitoring, and dependency management for all workflows. | |
3. Non-Goals | |
To maintain a clear focus and manageable scope for the initial implementation, the following are considered non-goals for Version 1.0: | |
A Graphical User Interface (GUI): The engine will be operated via a command-line interface (CLI) or an API. A workflow builder UI is a potential future enhancement. | |
Distributed Big Data Processing: The engine will be designed for single-node, in-memory execution. It is not intended to replace or compete with distributed computing frameworks like Apache Spark or Dask. | |
Real-time Stream Processing: The system is designed for batch or micro-batch workloads, not for ultra-low-latency, event-at-a-time stream processing. | |
Advanced Orchestration Features: Complex features like dynamic sub-workflows, conditional branching based on output, or human-in-the-loop approval steps are out of scope for the initial version. The focus is on executing a static Directed Acyclic Graph (DAG) of tasks. | |
4. Stakeholders | |
Stakeholder Role | |
Key Interest | |
Data Scientists / ML Engineers | |
(Primary Users) Rapidly define, test, and deploy new model and feature pipelines using simple metadata files. | |
Data Engineers | |
(Component Developers) Build, maintain, and test the library of robust and reusable workflow components. | |
Platform / DevOps Engineers | |
Deploy, monitor, and scale the central workflow engine. Ensure its reliability and uptime. | |
Project/Product Managers | |
Increase team velocity, reduce operational costs, and ensure consistent quality in data products. | |
5. Design Decisions | |
Decision 1: Centralized Engine with Component Registry | |
We will implement a central WorkflowEngine class responsible for orchestrating all workflows. This engine will use a Component Registry (a simple dictionary mapping) to dynamically discover and load the appropriate component class based on the component_name specified in the workflow metadata. | |
Justification: This design pattern decouples the engine from the components. New components can be added to the registry without any changes to the engine's core logic, making the system highly extensible. | |
Decision 2: In-Memory Data Exchange | |
Data will be passed between components in-memory. The engine will maintain an In-Memory Data Context (a Python dictionary) that holds the outputs of each executed task. A downstream task can reference these outputs by name as its inputs. | |
Justification: For workflows where the intermediate data artifacts fit comfortably in RAM, this approach is extremely fast. It avoids the performance overhead of serializing/deserializing data to disk or a network location between every step, which is ideal for the targeted use cases. | |
Decision 3: Standardized Component Interface | |
Every component will be a Python class that adheres to a strict interface. It must implement: | |
__init__(self, inputs, outputs, params): To receive its configuration. | |
execute(self): To perform its core logic and return a dictionary of its outputs. | |
Justification: A standardized interface is essential for the engine to treat all components polymorphically. It enforces a clean contract, simplifies testing, and makes components easy to reason about. | |
Decision 4: YAML for Workflow Definitions | |
Workflows will be defined in YAML files. YAML is chosen for its superior human readability, support for comments, and widespread adoption in the configuration-as-code ecosystem (e.g., Kubernetes, CI/CD). | |
Justification: YAML is less verbose and easier for non-developers to write and review compared to JSON or XML. This lowers the barrier to entry for Data Scientists and other stakeholders to define their own workflows. | |
6. Implementation Details | |
A. High-Level Architecture | |
+---------------------------------+ | |
| Workflow Definition (YAML) | | |
| - name: "my_workflow" | | |
| - tasks: [...] | | |
+---------------------------------+ | |
| | |
v | |
+---------------------------------+ +---------------------------+ | |
| InMemoryWorkflowEngine |----->| Component Registry | | |
|---------------------------------| |---------------------------| | |
| - data_context: {} | | "load_data": LoadDataCls | | |
| - DAG resolver | | "transform": TransformCls | | |
| - Task executor | +---------------------------+ | |
+---------------------------------+ | |
| | |
v (Instantiates and executes components in order) | |
+---------------------------------+ +---------------------------+ | |
| Active Component |----->| In-Memory Data Context | | |
|---------------------------------| |---------------------------| | |
| - __init__(...) | | "raw_df": <DataFrame> | | |
| - execute() |<-----| "features": <DataFrame> | | |
+---------------------------------+ +---------------------------+ | |
B. Project Structure | |
/workflow_system | |
├── components/ | |
│ ├── __init__.py | |
│ ├── data_loaders.py | |
│ └── feature_transformers.py | |
├── workflows/ | |
│ ├── churn_prediction.yaml | |
│ └── sales_forecasting.yaml | |
├── engine.py # Contains the WorkflowEngine and ComponentRegistry | |
└── main.py # CLI entry point to run a workflow | |
C. Core Classes and Data Structures | |
1. Example Workflow Definition (churn_prediction.yaml) | |
workflow_name: "customer_churn_prediction_pipeline" | |
description: "End-to-end pipeline for predicting customer churn." | |
tasks: | |
- name: "load_customer_data" | |
component_name: "s3_csv_loader" | |
params: | |
bucket: "my-data-bucket" | |
key: "raw/customers.csv" | |
outputs: | |
- "raw_customer_df" | |
- name: "engineer_features" | |
component_name: "churn_feature_engineer" | |
# This task depends on the output of the previous task | |
inputs: | |
input_df: "raw_customer_df" | |
outputs: | |
- "feature_df" | |
- name: "run_prediction" | |
component_name: "sklearn_predictor" | |
inputs: | |
model_path: "s3://my-models/churn/latest.pkl" | |
data_to_predict: "feature_df" | |
outputs: | |
- "predictions" | |
- name: "save_results" | |
component_name: "database_writer" | |
inputs: | |
data_to_save: "predictions" | |
params: | |
table_name: "churn_results" | |
2. The Engine (engine.py) | |
# A simplified representation of the engine's logic | |
# Component Registry maps name from YAML to the actual class | |
COMPONENT_REGISTRY = { ... } | |
class InMemoryWorkflowEngine: | |
def __init__(self, workflow_path): | |
self.workflow_def = self._load_yaml(workflow_path) | |
self.data_context = {} # Stores all in-memory data | |
self.task_graph = self._build_dag(self.workflow_def['tasks']) | |
def run(self): | |
execution_order = self._topological_sort(self.task_graph) | |
for task_name in execution_order: | |
task_info = self._get_task_info(task_name) | |
# Resolve inputs from data_context | |
inputs = {k: self.data_context[v] for k, v in task_info.get('inputs', {}).items()} | |
# Instantiate and run | |
ComponentClass = COMPONENT_REGISTRY[task_info['component_name']] | |
component = ComponentClass(inputs, task_info['outputs'], task_info.get('params')) | |
outputs = component.execute() | |
# Store results back in context | |
self.data_context.update(outputs) | |
3. Example Component (data_loaders.py) | |
import pandas as pd | |
class S3CsvLoader: | |
def __init__(self, inputs, outputs, params): | |
# No inputs from other tasks for a loader | |
self.output_name = outputs[0] | |
self.bucket = params['bucket'] | |
self.key = params['key'] | |
def execute(self): | |
# In a real implementation, use boto3 to read from S3 | |
path = f"s3://{self.bucket}/{self.key}" | |
print(f"Loading data from {path}...") | |
df = pd.read_csv(path) | |
return {self.output_name: df} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment