Skip to content

Instantly share code, notes, and snippets.

@myui
Last active May 30, 2025 02:11
Show Gist options
  • Save myui/b6a19663daff2fcd724fb6808ca7d86c to your computer and use it in GitHub Desktop.
Save myui/b6a19663daff2fcd724fb6808ca7d86c to your computer and use it in GitHub Desktop.

Redis FairQueue 設計指示書(最終版)

プロジェクト概要

基本情報

  • プロジェクト名: fairque
  • ディレクトリ: /Users/myui/workspace/myui/fairque
  • パッケージ管理: uv
  • Python バージョン: 3.10以上
  • 型注釈: 必須(Full Typing)
  • コメント: 英語

サポート対象

  • Redis 7.2.5
  • Valkey 7.2.6
  • Amazon MemoryDB for Redis

アーキテクチャ設計

基本設計方針

  1. Redis/Valkeyを永続化ストレージとして使用(In-memory cacheではない)
  2. Luaスクリプトによるサーバーサイド処理を重視(効率とアトミック性)
  3. ロック範囲を最小化したアトミック処理
  4. マルチワーカー対応
  5. Work Stealing機構によるロードバランシング

キュー構造

queue:user:{user_id}:critical  # ユーザー毎のCriticalキュー (List)
queue:user:{user_id}:normal    # ユーザー毎のNormalキュー (Sorted Set)

処理優先度

  1. ユーザー選択: Round Robin(Work Stealingあり)
  2. ユーザー内優先度: Critical → Normal(完全優先)
  3. Critical Queue: FIFO(先入先出)
  4. Normal Queue: Priority + Time-weighted Scoring

タスク仕様

タスクデータ構造

@dataclass
class Task:
    task_id: str
    user_id: str
    priority: int          # 1-5 (1=最低優先度, 5=最高優先度)
    queue_type: QueueType  # critical/normal
    payload: Dict[str, Any]
    retry_count: int = 0
    max_retries: int = 3
    created_at: float      # タイムスタンプ
    execute_after: float   # 実行可能時刻

優先度システム

  • Priority範囲: 1-5(1=最低優先度、5=最高優先度)
  • Priority重み: priority / 5
    • Priority 1: weight = 0.2(最低)
    • Priority 2: weight = 0.4
    • Priority 3: weight = 0.6
    • Priority 4: weight = 0.8
    • Priority 5: weight = 1.0(最高)

スコア計算(Normal Queue)

def _calculate_score(self, task: Task) -> float:
    current_time = time.time()
    elapsed_time = current_time - task.created_at
    priority_weight = task.priority / 5.0
    
    return task.created_at + (priority_weight * elapsed_time)

特徴:

  • Higher score = Higher priority
  • Priority 5のタスクは時間経過とともにスコアが大きく増加
  • Priority 1のタスクは時間経過の影響を最小限に抑制
  • 長時間待機している高優先度タスクが適切に処理される

Worker設計

Worker構成

  • 設定ファイル: YAML形式
  • 担当ユーザー: マニュアル設定
  • Work Stealing: Rotationベース

Worker設定例

worker:
  id: "worker-001"
  assigned_users: ["user:1", "user:3", "user:5"]
  steal_targets: ["user:2", "user:4", "user:6", "user:7"]
  poll_interval_seconds: 1.0
  task_timeout_seconds: 300

redis:
  host: "localhost"
  port: 6379
  db: 0

Work Stealing機構

user_queues = deque(["user:1", "user:2", "user:3"])
user_queues.rotate(-1)  # 毎回ローテーション

# Round Robin でユーザー選択後、
# 選択されたユーザーのCritical → Normal順で処理

処理フロー

タスク投入フロー

  1. Critical Task: LPUSH queue:user:{user_id}:critical
  2. Normal Task:
    score = task.created_at + (task.priority/5.0) * elapsed_time
    ZADD queue:user:{user_id}:normal {score} {task_json}

タスク取得フロー

  1. Round Robinでユーザー選択(user_queues.rotate(-1)
  2. 選択されたユーザーに対して:
    • Critical Queue: LPOPでFIFO取得
    • Critical空なら Normal Queue: ZREVRANGEで最高スコアタスク取得
  3. Work Stealing: 担当ユーザーが空なら他ユーザーから取得

リトライ機構

  • 最大リトライ回数: 3回(デフォルト)
  • バックオフ: 指数関数的(1秒→2秒→4秒→8秒)
  • Dead Letter Queue: 不要(最大リトライ後は破棄)

Luaスクリプト設計

1. Single User Pop Script

-- 指定ユーザーのキューからタスクを取得
-- ARGV[1]: user_id
-- ARGV[2]: current_time

local user_id = ARGV[1]
local current_time = tonumber(ARGV[2])

-- Try critical queue first (FIFO)
local critical_key = "queue:user:" .. user_id .. ":critical"
local critical_task = redis.call('LPOP', critical_key)
if critical_task then
    return critical_task
end

-- If no critical task, try normal queue (priority-based)
local normal_key = "queue:user:" .. user_id .. ":normal"
local tasks = redis.call('ZREVRANGE', normal_key, 0, 0, 'WITHSCORES')

if #tasks >= 2 then
    local task = tasks[1]
    local score = tonumber(tasks[2])
    local task_obj = cjson.decode(task)
    
    -- Check if task is ready to execute
    if task_obj.execute_after <= current_time then
        redis.call('ZREM', normal_key, task)
        return task
    end
end

return nil

2. Normal Task Push Script

-- Normal Queueにタスクを追加
-- KEYS[1]: queue_key
-- ARGV[1]: task_json
-- ARGV[2]: score (Python側で計算済み)

local queue_key = KEYS[1]
local task_json = ARGV[1]
local score = tonumber(ARGV[2])

return redis.call('ZADD', queue_key, score, task_json)

3. Retry Task Script

-- リトライタスクの処理
-- 指数バックオフでexecute_afterを更新

local queue_key = KEYS[1]
local queue_type = ARGV[1]  -- "critical" or "normal"
local task_json = ARGV[2]
local retry_count = tonumber(ARGV[3])
local current_time = tonumber(ARGV[4])
local new_score = tonumber(ARGV[5])  -- Python側で再計算済み

-- Calculate exponential backoff
local delay = math.pow(2, retry_count - 1)
local execute_after = current_time + delay

-- Update task
local task_obj = cjson.decode(task_json)
task_obj.execute_after = execute_after
task_obj.retry_count = retry_count
local updated_task = cjson.encode(task_obj)

if queue_type == "critical" then
    return redis.call('LPUSH', queue_key, updated_task)
else
    -- Use recalculated score for normal tasks
    return redis.call('ZADD', queue_key, new_score, updated_task)
end

プロジェクト構成

/Users/myui/workspace/myui/fairque/
├── pyproject.toml                # uv configuration
├── README.md                     # Project documentation
├── fairque/              # Main package
│   ├── __init__.py
│   ├── queue.py                  # fairque implementation
│   ├── worker.py                 # Worker implementation
│   ├── models.py                 # Pydantic models (Task, Config)
│   ├── config.py                 # Configuration loader
│   ├── lua_scripts/              # Lua scripts directory
│   │   ├── __init__.py
│   │   ├── single_user_pop.lua   # Single user pop script
│   │   ├── normal_task_push.lua  # Normal task push script
│   │   └── retry_task.lua        # Retry task script
│   └── utils.py                  # Utility functions
├── config/
│   └── worker_config.yaml        # Worker configuration template
├── examples/
│   ├── producer.py               # Task producer example
│   └── consumer.py               # Worker consumer example
└── tests/
    ├── __init__.py
    ├── test_queue.py             # Queue tests
    ├── test_worker.py            # Worker tests
    └── test_lua_scripts.py       # Lua script tests

Priority計算の詳細例

スコア計算例

# Task作成時
task = Task(
    task_id="task-001",
    user_id="user-1", 
    priority=5,  # 最高優先度
    created_at=1000.0
)

# 時間経過後のスコア計算
current_time = 1100.0  # 100秒経過
elapsed_time = 100.0
priority_weight = 5/5 = 1.0

score = 1000.0 + (1.0 * 100.0) = 1100.0

# Priority 1のタスクの場合
priority_weight = 1/5 = 0.2
score = 1000.0 + (0.2 * 100.0) = 1020.0

実際の優先度動作

  • Priority 5: 時間経過とともに急速にスコア上昇
  • Priority 3: 中程度のスコア上昇
  • Priority 1: 緩やかなスコア上昇
  • 結果: 高優先度タスクが適切に処理され、低優先度タスクも時間経過で処理機会を得る

実装要件

Type Safety

  • 全てのメソッドに型注釈を追加
  • PydanticベースのDataclass使用
  • Union型でsync/async Redis両対応

エラーハンドリング

  • Redis接続エラーの適切な処理
  • JSON serialization/deserializationエラー処理
  • Lua script実行エラー処理

パフォーマンス

  • サーバーサイドLuaスクリプト優先
  • アトミックな操作によるロック範囲最小化
  • 効率的なWork Stealing実装
  • Priority計算の最適化

設定管理

  • YAML形式の設定ファイル
  • 環境変数による設定オーバーライド
  • バリデーション機能

除外された機能

以下の機能は設計から除外:

  • User Weight Penalty: 複雑性のため削除
  • Processing Key管理: 簡素化のため削除
  • Dead Letter Queue: 要件に含まれないため除外
  • Fair Scheduling between users: ユーザー間はRound Robinのみ

実装優先度

Phase 1 (MVP)

  1. 基本的なQueue操作(push/pop)
  2. Critical/Normal Queue分離
  3. Priority重み付きスコア計算
  4. 基本的なWork Stealing

Phase 2

  1. Retry機構
  2. Worker設定管理
  3. 統計情報取得
  4. Exponential backoff実装

Phase 3

  1. 包括的なテスト
  2. パフォーマンス最適化
  3. ドキュメント整備
  4. 監視・アラート機能

品質要件

  • コードカバレッジ: 90%以上
  • 型チェック: mypy strict mode
  • Linting & Format: ruff(linting + formatting統合)
  • ドキュメント: Sphinx

Priority システムの利点

1. 柔軟な優先度制御

  • 5段階の細かい優先度設定
  • 時間経過による動的な優先度調整

2. 飢餓状態の防止

  • 低優先度タスクも時間経過で処理機会を獲得
  • Priority 1でも0.2倍の重みで時間経過の恩恵を受ける

3. 実用的なバランス

  • 緊急タスク(Priority 5)は即座に高スコア
  • 通常タスク(Priority 3)は適度な処理順序
  • 低優先度タスク(Priority 1)も適切なタイミングで処理

この設計指示書に基づいて実装を進めることで、効率的で公平性を保ったRedis FairQueueシステムを構築できます。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment