ドキュメントバージョン: 1.1 最終更新: 2025-10-23 著者: Graflowチーム
更新履歴:
- v1.1 (2025-10-23): コア機能 #9 に「チェックポイント/リジューム」を追加し、関連セクションを全面刷新
- v1.0 (2025-10-22): 初版
Graflowは、汎用ワークフロー実行エンジンであり、タスクオーケストレーションシステムの長所に、動的グラフ変更、ワーカーフリート管理、プラグ可能な実行戦略、チェックポイント/リジューム機能、Pythonic DSL設計といった独自の革新を組み合わせています。
| 機能 | 説明 | 競合ツール |
|---|---|---|
| ワーカーフリート管理 | TaskWorkerによるグループ化タスクの分散並列実行 | Celery、Airflow(部分的) |
| ランタイム動的タスク | 実行中にワークフローグラフを変更可能 | なし(LangGraphはコンパイル時のみ) |
| ステートマシン実行 | next_iteration() による第一級のサイクルサポート | なし |
| Pythonic演算子DSL | ワークフローグラフ(DAG+ループ)を数学的構文(>>、|)で表現 |
なし |
| プラグ可能タスクハンドラー | GPU、SSH、クラウド、Dockerなどのカスタム実行戦略 | Celery / Airflow で限定的 |
| Docker実行 | コンテナ化タスク実行を標準装備 | 外部ツールが必要 |
| 細粒度エラーポリシー | 5つの組み込み並列グループエラーモード | なし |
| シームレスなローカル/分散切替 | 1行でバックエンドを切り替え | なし(多くは追加インフラ必須) |
| チャンネルベース通信 | ワークフロー全体で共有する名前空間付きKVS | XCom(Airflow)、State(LangGraph) |
| チェックポイント/リジューム | タスク内からのユーザー制御チェックポイント保存 | LangGraph(自動のみ)、Airflow(限定的) |
Graflowの独自機能は9つのカテゴリーで構成されています。
- ワーカーフリート管理 — タスクグループを分散並列実行
- ランタイム動的タスク — 実行中にグラフ構造を変更
- ステートマシン実行 — next_iterationによる第一級サイクル制御
- プラグ可能タスクハンドラー — Dockerを含む柔軟な実行戦略
- 細粒度エラーポリシー — 並列グループ単位で柔軟に制御
- Pythonic演算子DSL — DAGとループを直感的に記述
- シームレスなローカル/分散切替 — バックエンドのワンライン切替
- チャンネル通信 — 名前空間付きKVSによるステート共有
- チェックポイント/リジューム — ワークフロー状態の永続化と復旧
実装ファイル: graflow/worker/worker.py, examples/05_distributed/redis_worker.py
GraflowはTaskWorkerプロセスを標準装備しており、任意の場所でワーカーを起動しやすく分散並列処理を構成できます。
┌─────────────┐
│ メインプロセス │ タスクをRedisキューへ投入
└─────┬───────┘
│
▼
┌─────────────────────────────────────────┐
│ Redisタスクキュー │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Task 1 │ │ Task 2 │ │ Task 3 │ │
│ └────────┘ └────────┘ └────────┘ │
└───────┬──────────┬──────────┬──────────┘
│ │ │
┌────▼───┐ ┌───▼────┐ ┌──▼─────┐
│Worker1│ │Worker2 │ │Worker3 │
│ 4 CPU │ │ 8 CPU │ │16 CPU │
└────────┘ └────────┘ └────────┘
- 専用TaskWorkerプロセス: サーバーやコンテナからキューに接続して実行
- Graceful Shutdown: SIGTERM/SIGINTで安全に停止、処理中タスクを完了
- ThreadPoolExecutor: ワーカー内でのマルチタスク処理
- メトリクス収集: 処理件数、成功/失敗数、総実行時間などをエクスポーズ
- 水平スケール: ワーカー追加でリニアにスループットを伸長
- 特化ワーカー: GPU専用、I/O専用などの構成が容易
- 本番運用テンプレート: systemdサービス、Dockerfile、Kubernetes Manifestを提供
| 観点 | Graflow | Celery | LangGraph | Airflow |
|---|---|---|---|---|
| CLIワーカー | ✅ 標準 | ✅ | ❌ | ✅ |
| Graceful停止 | ✅ | ✅ | N/A | ✅ |
| メトリクス | ✅ 標準 | ❌ | ✅ | |
| オートスケール | ✅ | ✅ | ❌ | |
| ステート共有 | ✅ Redisチャンネル |
実装ファイル: examples/07_dynamic_tasks/runtime_dynamic_tasks.py
実行中の入力に応じてタスクを増減でき、静的DAGでは対応しづらい適応型ワークフローを表現できます。
@task(inject_context=True)
def adaptive_processor(context: TaskExecutionContext):
data_quality = check_quality()
if data_quality < 0.5:
cleanup_task = TaskWrapper("cleanup", cleanup_low_quality_data)
context.next_task(cleanup_task)
elif data_quality > 0.9:
enhance_task = TaskWrapper("enhance", enhance_high_quality_data)
context.next_task(enhance_task)
else:
process_task = TaskWrapper("process", standard_processing)
context.next_task(process_task)context.next_taskで新タスクを生成しキューへ投入- 既存タスクへジャンプする
goto=Trueもサポート - 動的なステップ追加でもチェックポイント/リジュームと整合
next_iteration() を用いることで、収束処理や状態遷移を明示的に表現できます。チャンネルに状態を保存しながらループするパターンを推奨します。
@task(inject_context=True)
def iterative_task(context: TaskExecutionContext):
channel = context.get_channel()
iteration = channel.get("iteration", 0)
result = run_step(iteration)
channel.set("iteration", iteration + 1)
if not converged(result):
context.next_iteration()
else:
channel.set("result", result)
return "DONE"- サイクル数は CycleController が制限し、無限ループを防止
- チェックポイントと組み合わせることで途中経過の復旧が可能
実装ファイル: graflow/core/handlers/
Graflowは実行戦略(Execution Strategy)をプラグインとして定義でき、ローカル実行、Docker、SSH、クラウドAPI呼び出しなどを同一DSLで混在させられます。
| ハンドラー | 用途 | 特徴 |
|---|---|---|
direct |
プロセス内実行 | デバッグ、単体テストに最適 |
docker |
コンテナ実行 | 依存関係の隔離、GPUサポート |
group_policy |
並列グループ制御 | エラーポリシーと連携 |
| カスタム | 任意実装 | APIコール、リモート実行、GPU特化など |
ハンドラーは @task(handler="docker", handler_kwargs={...}) のように指定します。
実装ファイル: graflow/core/handlers/group_policy.py
並列グループに対して5種類のエラーハンドリングモードを提供。
| モード | 動作 | 典型ユースケース |
|---|---|---|
strict |
いずれか失敗したらグループ全体を失敗扱い | 重要業務ワークフロー |
best_effort |
成功分のみを採用し失敗は無視 | 部分成功を許容するデータ収集 |
at_least_n |
指定した件数以上が成功すれば続行 | アンサンブル推論(3件中2件成功で十分など) |
critical_tasks |
指定したタスクだけは必ず成功させる | プライマリ処理は必須、副次処理は任意 |
custom |
独自ポリシーを実装して適用 | 特殊要件に合わせたエラーロジック |
これらのモードは parallel_group.with_policy(...) で適用でき、ErrorHandlingPolicy ヘルパーを介して簡潔に指定可能です。ライブ統計に基づき、失敗時の自動リトライ/スキップを制御できます。
実装ファイル: graflow/core/workflow.py
Graflowは演算子オーバーロードを使って、DAGおよびサイクルを数学的に記述します。
with workflow("etl") as wf:
fetch >> (transform_a | transform_b) >> merge >> load
@task(inject_context=True)
def loop(context):
if not context.get_channel().get("done"):
context.next_iteration()| 構文 | 意味 |
|---|---|
a >> b |
aの出力がbの入力 |
a | b |
並列分岐 |
(a | b) >> c |
ファンイン |
next_iteration() |
同一タスクの次サイクルをキューに投入 |
context.next_task(executable) |
新しいタスクを生成しキューに追加 |
context.next_task(existing, goto=True) |
既存タスクへジャンプ(後続をスキップ) |
ポイント: GraflowはDAGだけでなく、状態機械のループもDSLで表現できます。
Graflow では ExecutionContext のキューは常にインメモリですが、並列・分散実行が必要な ParallelGroup(例: (task_a | task_b))に対しては GroupExecutor を通じてバックエンドを切り替えます。
parallel = (task_a | task_b | task_c).with_execution(
backend=CoordinationBackend.REDIS,
backend_config={"redis_client": redis_client, "key_prefix": "graflow:prod"}
)
parallel.run()- Sequential(直列)タスクは常にローカル実行(in-memory queue)で処理される
(task_a | task_b)のような ParallelGroup 実行時にのみ、THREADING や REDIS バックエンドを選択可能- REDIS バックエンドの場合、
GroupExecutorが Redis キューへタスクを発行し、TaskWorkerがそれを消費する(キープレフィックスを合わせること) - ParallelGroup の実行モデルは Bulk Synchronous Parallel (BSP) に準拠しており、並列タスク完了後にバリア同期を行ってから次段へ進む
実装ファイル: graflow/channels/
チャンネルはワークフロー全体で共有される名前空間付きKey-Valueストアです。
- MemoryChannel(デフォルト): ローカル実行向け、チェックポイントに含まれる
- RedisChannel: 分散実行向け、ワーカー間でステート共有
- TypedChannel: TypedDictでスキーマを強制
channel = context.get_channel()
channel.set("epoch", 10)
loss = channel.get("loss")ベストプラクティス
- チャンネルを単一の真実として扱い、副作用を明確化
- 大規模データは外部ストレージへ格納し、チャンネルには参照のみを保存
- チェックポイント利用時はチャンネルデータも保存される点に留意
実装ファイル: graflow/core/checkpoint.py, tests/core/test_checkpoint.py
Graflowはタスク内から context.checkpoint() を呼び出すだけで、ワークフローの完全なスナップショットを取得できます。チェックポイントはタスク完了直後にエンジンが作成するため、一貫したステートが保存されます。
checkpoint_base_path/
├── checkpoint.pkl # ExecutionContext(グラフやチャンネルを含む)
├── checkpoint.state.json # 実行ステート(ステップ数、保留タスク、サイクル情報)
└── checkpoint.meta.json # メタデータ(タイムスタンプ、ユーザーメタデータ)
@task(inject_context=True)
def process_batch(context):
run_expensive_step()
context.checkpoint(metadata={"stage": "post_processing"})- タスクは処理を完了させた上でチェックポイントが作成される
- 直近のパスは
context.execution_context.last_checkpoint_pathに保存 - メタデータでバッチ番号や状態名を記録するとリジュームが容易
checkpoint_path, metadata = context.checkpoint(
metadata={"stage": "critical_section"},
deferred=False,
path="checkpoints/manual/snapshot.pkl"
)
print(f"Checkpoint saved to {checkpoint_path}")deferred=Falseでその場でファイルを作成し、現在タスクから再開できる(path, metadata)を即時に取得可能
from graflow.core.checkpoint import CheckpointManager
checkpoint_path, metadata = CheckpointManager.create_checkpoint(
execution_context,
metadata={"stage": "before_shutdown"}
)from graflow.core.checkpoint import CheckpointManager
from graflow.core.engine import WorkflowEngine
checkpoint_path = "checkpoints/session_12345_step_40.pkl"
restored_context, metadata = CheckpointManager.resume_from_checkpoint(checkpoint_path)
engine = WorkflowEngine()
engine.execute(restored_context)- グラフや保留タスクはチェックポイントに含まれるため再構築不要
- Redisバックエンド使用時は同じRedisインスタンスへ接続すること
- 長時間タスクに
context.checkpoint(metadata=...)を挿入 - 出力されたパス(ログまたは
last_checkpoint_path)を記録 - 再起動時に
CheckpointManager.resume_from_checkpoint(path)を呼び、WorkflowEngine.execute()に渡す
- 長時間ML訓練
@task(inject_context=True)
def train_model(context):
channel = context.get_channel()
epoch = channel.get("epoch", 0)
model = channel.get("model")
model = train_epoch(model)
channel.set("model", model)
channel.set("epoch", epoch + 1)
if (epoch + 1) % 10 == 0:
context.checkpoint(metadata={"epoch": epoch + 1})
if epoch + 1 < max_epochs:
context.next_iteration()
else:
return "Training complete"- 途中で障害が起きても最新チェックポイントから再開(例: epoch 40)
- 多段ETLパイプライン
with workflow("etl_pipeline") as wf:
extract >> validate >> transform >> load
@task(inject_context=True)
def transform(context):
result = expensive_transformation()
context.checkpoint(metadata={"stage": "transformed"})
return result- 分散ワークフローのフェイルオーバー
from graflow.queue.redis import RedisTaskQueue
context = ExecutionContext.create(
graph,
"start",
channel_backend="redis",
config={"redis_client": redis_client, "key_prefix": "graflow:etl"}
)
redis_queue = RedisTaskQueue(context, redis_client=redis_client, key_prefix="graflow:etl")
@task(inject_context=True)
def distributed_step(context):
process_partition()
context.checkpoint(metadata={"worker": "worker-1"})
finalize_partition()- ステートマシンの遷移ごとに保存
@task(inject_context=True)
def order_state_machine(context):
channel = context.get_channel()
state = channel.get("order_state", "NEW")
if state == "NEW":
validate_order()
channel.set("order_state", "VALIDATED")
context.checkpoint(metadata={"state": "VALIDATED"})
context.next_iteration()
elif state == "VALIDATED":
process_payment()
channel.set("order_state", "PAID")
context.checkpoint(metadata={"state": "PAID"})
context.next_iteration()
elif state == "PAID":
ship_order()
return "ORDER_COMPLETE"- ローカルバックエンド: ファイルベース(
.pkl,.state.json,.meta.json) - Redisバックエンド: 分散環境での共有チェックポイント(設計済み)
- S3バックエンド: クラウドストレージ対応を計画
- 完全な状態保存: グラフ、チャンネル、保留タスク、サイクル情報、ユーザーメタデータ
- バリデーション: schema_version 検証と原子的ファイル書き込みで破損を防止
- コストの高い処理直後に
context.checkpoint()を挿入 - メタデータにバッチ番号やタイムスタンプを格納
- リジューム失敗時はログを確認し初期状態から再実行
- 古いチェックポイントは世代管理して容量を最適化
- CIで
CheckpointManager.resume_from_checkpointのユニットテストを実施
| 機能 | Graflow | LangGraph | Celery | Airflow |
|---|---|---|---|---|
| Pythonic DSL | ✅ >>, |(DAG+サイクル) |
❌ | ||
| ランタイム動的タスク | ✅ next_task() |
❌ | ❌ | |
| ステートマシン実行 | ✅ next_iteration() + チャンネル |
❌ | ❌ | |
| ワーカーフリートCLI | ✅ 組み込み | ❌ | ✅ | ✅ |
| カスタムハンドラー | ✅ プラグ可能 | ❌ | ||
| Docker実行 | ✅ 組み込み | ❌ | ||
| 並列エラーポリシー | ✅ 5モード + カスタム | ❌ | ||
| ローカル/分散切替 | ✅ 1行 | ❌ | ❌ | ❌ |
| チャンネル通信 | ✅ 名前空間付きKVS | ❌ | ||
| Graceful Shutdown | ✅ 標準 | N/A | ✅ | ✅ |
| メトリクス収集 | ✅ ワーカーレベル | ❌ | ✅ | |
| サイクル検出 | ✅ 組み込み | N/A | ❌ | |
| コンテキストマネージャー | ✅ with workflow() |
❌ | ❌ | ❌ |
| 型安全性 | ✅ TypedChannel | ✅ Pydantic | ❌ | ❌ |
| チェックポイント/リジューム | ✅ 本番対応 | ❌ |
| 指標 | Graflow | LangGraph | Celery | Airflow |
|---|---|---|---|---|
| ローカル実行オーバーヘッド | 低(インプロセス) | 低 | 高(ブローカー経由) | 高(DB) |
| 分散レイテンシ | 中(Redis) | N/A | 中 | 高(ポーリング) |
| スループット | 高(ワーカーフリート) | 低(単一プロセス) | 高 | 中 |
| メモリフットプリント | 中 | 低 | 中 | 高 |
- 汎用データ/MLパイプライン: ETL、特徴量生成、バッチ推論
- 動的ワークフロー: 条件分岐が多い処理、グラフのオンデマンド拡張
- 分散実行: ワーカーフリート管理、地理分散、リソース特化
- カスタム実行戦略: リモートAPI呼び出し、GPU/TPU、レート制限制御
- 開発スピード重視: ローカルで即実行しつつ本番へ段階移行
- ステートマシン/ループ処理: 注文処理、状態遷移、ゲームループ
- コンテナ化実行: 隔離環境、依存関係衝突回避、信頼できないコード
- 長時間ワークフローの信頼性確保: 定期的なチェックポイントと再開が必要な処理
- 会話型AIやLLMエージェント
- ツール呼び出し中心のフロー
- 内蔵の自動チェックポイント/再実行を活用したい場合
- ヒューマンループが必須の対話型シナリオ
- シンプルなバックグラウンドジョブ(メール送信など)
- 分散タスクキューのみが必要
- 既存Celeryエコシステムへ統合する場合
- ETL中心の定期バッチ
- 豊富なOperatorライブラリが必要
- 既存Airflowインフラが整備済み
┌─────────────────────────────────┐
│ 単一サーバーインスタンス │
│ ┌──────────────┐ ┌──────────┐ │
│ │ Graflowアプリ │ │ Redis (ローカル) │ │
│ └──────────────┘ └──────────┘ │
│ ┌──────────────┐ │
│ │ Graflow Worker │ (ThreadPoolで並列) │
│ └──────────────┘ │
└─────────────────────────────────┘
┌──────────────┐
│ Redisクラスタ │
└──────┬───────┘
│
┌───┴────┬─────┬─────┐
┌─▼───┐ ┌─▼───┐ ┌─▼───┐ ┌─▼───┐
│API │ │Worker│ │Worker│ │Worker│
│サーバ│ │ CPU │ │ GPU │ │ I/O │
└─────┘ └─────┘ └─────┘ └─────┘
- Docker Compose: Redisとワーカー複数を簡単に起動
- Kubernetes: Helmチャートでスケール・監視を標準化
- Systemd: オンプレ環境向けの永続ワーカーサービス
- Prometheus / Grafana によるワーカー指標収集
- キュー長、成功率、タスクレイテンシ(P50/P95/P99)を可視化
- Redis Commander でキューの検査
- 将来的なダッシュボードAPIに備えたメトリクスエンドポイント
- 高度なモニタリング: Prometheusエクスポーター、Grafanaテンプレート、リアルタイムUI
- 高度なスケジューリング: Cron、優先度キュー、ワークフロー間依存
- ワークフロー合成: ネスト構成、テンプレート、ライブラリ化
- 統合強化: Kubernetesネイティブ、AWS Lambda、Apache Beam
- チェックポイント拡張: Redis/S3バックエンド、圧縮、ライフサイクル管理
Graflowは次世代のワークフローオーケストレーションを体現し、以下を同時に満たします。
- Pythonicな表現力 — DSLでDAGとループをシンプルに表現
- 本番運用の堅牢性 — ワーカーフリート、並列エラーポリシー、チェックポイント
- 開発の俊敏性 — ローカルと分散のシームレスな切替
- 拡張性 — カスタムハンドラー/実行戦略で容易に拡張
- 動的対応力 — 実行時タスク生成とステートマシン制御
LangGraphの軽量性とAirflowの重量級インフラの間を埋め、現代のデータ/MLワークフローに最適な選択肢を提供します。
ドキュメント管理者: Graflowチーム
最終レビュー: 2025-10-23(チェックポイント機能アップデート反映)
次回レビュー: 四半期ごと