- プロジェクト名:
fairque
- ディレクトリ:
/Users/myui/workspace/myui/fairque
- パッケージ管理: uv
- Python バージョン: 3.10以上
- 型注釈: 必須(Full Typing)
- コメント: 英語
- Redis 7.2.5
- Valkey 7.2.6
- Amazon MemoryDB for Redis
- Redis/Valkeyを永続化ストレージとして使用(In-memory cacheではない)
- Luaスクリプトによるサーバーサイド処理を重視(効率とアトミック性)
- ロック範囲を最小化したアトミック処理
- マルチワーカー対応
- Work Stealing機構によるロードバランシング
queue:user:{user_id}:critical # ユーザー毎のCriticalキュー (List)
queue:user:{user_id}:normal # ユーザー毎のNormalキュー (Sorted Set)
- ユーザー選択: Round Robin(Work Stealingあり)
- ユーザー内優先度: Critical → Normal(完全優先)
- Critical Queue: FIFO(先入先出)
- Normal Queue: Priority + Time-weighted Scoring
@dataclass
class Task:
task_id: str
user_id: str
priority: int # 1-5 (1=最低優先度, 5=最高優先度)
queue_type: QueueType # critical/normal
payload: Dict[str, Any]
retry_count: int = 0
max_retries: int = 3
created_at: float # タイムスタンプ
execute_after: float # 実行可能時刻
- Priority範囲: 1-5(1=最低優先度、5=最高優先度)
- Priority重み:
priority / 5
- Priority 1: weight = 0.2(最低)
- Priority 2: weight = 0.4
- Priority 3: weight = 0.6
- Priority 4: weight = 0.8
- Priority 5: weight = 1.0(最高)
def _calculate_score(self, task: Task) -> float:
current_time = time.time()
elapsed_time = current_time - task.created_at
priority_weight = task.priority / 5.0
return task.created_at + (priority_weight * elapsed_time)
特徴:
- Higher score = Higher priority
- Priority 5のタスクは時間経過とともにスコアが大きく増加
- Priority 1のタスクは時間経過の影響を最小限に抑制
- 長時間待機している高優先度タスクが適切に処理される
- 設定ファイル: YAML形式
- 担当ユーザー: マニュアル設定
- Work Stealing: Rotationベース
worker:
id: "worker-001"
assigned_users: ["user:1", "user:3", "user:5"]
steal_targets: ["user:2", "user:4", "user:6", "user:7"]
poll_interval_seconds: 1.0
task_timeout_seconds: 300
redis:
host: "localhost"
port: 6379
db: 0
user_queues = deque(["user:1", "user:2", "user:3"])
user_queues.rotate(-1) # 毎回ローテーション
# Round Robin でユーザー選択後、
# 選択されたユーザーのCritical → Normal順で処理
- Critical Task:
LPUSH queue:user:{user_id}:critical
- Normal Task:
score = task.created_at + (task.priority/5.0) * elapsed_time ZADD queue:user:{user_id}:normal {score} {task_json}
- Round Robinでユーザー選択(
user_queues.rotate(-1)
) - 選択されたユーザーに対して:
- Critical Queue:
LPOP
でFIFO取得 - Critical空なら Normal Queue:
ZREVRANGE
で最高スコアタスク取得
- Critical Queue:
- Work Stealing: 担当ユーザーが空なら他ユーザーから取得
- 最大リトライ回数: 3回(デフォルト)
- バックオフ: 指数関数的(1秒→2秒→4秒→8秒)
- Dead Letter Queue: 不要(最大リトライ後は破棄)
-- 指定ユーザーのキューからタスクを取得
-- ARGV[1]: user_id
-- ARGV[2]: current_time
local user_id = ARGV[1]
local current_time = tonumber(ARGV[2])
-- Try critical queue first (FIFO)
local critical_key = "queue:user:" .. user_id .. ":critical"
local critical_task = redis.call('LPOP', critical_key)
if critical_task then
return critical_task
end
-- If no critical task, try normal queue (priority-based)
local normal_key = "queue:user:" .. user_id .. ":normal"
local tasks = redis.call('ZREVRANGE', normal_key, 0, 0, 'WITHSCORES')
if #tasks >= 2 then
local task = tasks[1]
local score = tonumber(tasks[2])
local task_obj = cjson.decode(task)
-- Check if task is ready to execute
if task_obj.execute_after <= current_time then
redis.call('ZREM', normal_key, task)
return task
end
end
return nil
-- Normal Queueにタスクを追加
-- KEYS[1]: queue_key
-- ARGV[1]: task_json
-- ARGV[2]: score (Python側で計算済み)
local queue_key = KEYS[1]
local task_json = ARGV[1]
local score = tonumber(ARGV[2])
return redis.call('ZADD', queue_key, score, task_json)
-- リトライタスクの処理
-- 指数バックオフでexecute_afterを更新
local queue_key = KEYS[1]
local queue_type = ARGV[1] -- "critical" or "normal"
local task_json = ARGV[2]
local retry_count = tonumber(ARGV[3])
local current_time = tonumber(ARGV[4])
local new_score = tonumber(ARGV[5]) -- Python側で再計算済み
-- Calculate exponential backoff
local delay = math.pow(2, retry_count - 1)
local execute_after = current_time + delay
-- Update task
local task_obj = cjson.decode(task_json)
task_obj.execute_after = execute_after
task_obj.retry_count = retry_count
local updated_task = cjson.encode(task_obj)
if queue_type == "critical" then
return redis.call('LPUSH', queue_key, updated_task)
else
-- Use recalculated score for normal tasks
return redis.call('ZADD', queue_key, new_score, updated_task)
end
/Users/myui/workspace/myui/fairque/
├── pyproject.toml # uv configuration
├── README.md # Project documentation
├── fairque/ # Main package
│ ├── __init__.py
│ ├── queue.py # fairque implementation
│ ├── worker.py # Worker implementation
│ ├── models.py # Pydantic models (Task, Config)
│ ├── config.py # Configuration loader
│ ├── lua_scripts/ # Lua scripts directory
│ │ ├── __init__.py
│ │ ├── single_user_pop.lua # Single user pop script
│ │ ├── normal_task_push.lua # Normal task push script
│ │ └── retry_task.lua # Retry task script
│ └── utils.py # Utility functions
├── config/
│ └── worker_config.yaml # Worker configuration template
├── examples/
│ ├── producer.py # Task producer example
│ └── consumer.py # Worker consumer example
└── tests/
├── __init__.py
├── test_queue.py # Queue tests
├── test_worker.py # Worker tests
└── test_lua_scripts.py # Lua script tests
# Task作成時
task = Task(
task_id="task-001",
user_id="user-1",
priority=5, # 最高優先度
created_at=1000.0
)
# 時間経過後のスコア計算
current_time = 1100.0 # 100秒経過
elapsed_time = 100.0
priority_weight = 5/5 = 1.0
score = 1000.0 + (1.0 * 100.0) = 1100.0
# Priority 1のタスクの場合
priority_weight = 1/5 = 0.2
score = 1000.0 + (0.2 * 100.0) = 1020.0
- Priority 5: 時間経過とともに急速にスコア上昇
- Priority 3: 中程度のスコア上昇
- Priority 1: 緩やかなスコア上昇
- 結果: 高優先度タスクが適切に処理され、低優先度タスクも時間経過で処理機会を得る
- 全てのメソッドに型注釈を追加
- PydanticベースのDataclass使用
- Union型でsync/async Redis両対応
- Redis接続エラーの適切な処理
- JSON serialization/deserializationエラー処理
- Lua script実行エラー処理
- サーバーサイドLuaスクリプト優先
- アトミックな操作によるロック範囲最小化
- 効率的なWork Stealing実装
- Priority計算の最適化
- YAML形式の設定ファイル
- 環境変数による設定オーバーライド
- バリデーション機能
以下の機能は設計から除外:
- ❌ User Weight Penalty: 複雑性のため削除
- ❌ Processing Key管理: 簡素化のため削除
- ❌ Dead Letter Queue: 要件に含まれないため除外
- ❌ Fair Scheduling between users: ユーザー間はRound Robinのみ
- 基本的なQueue操作(push/pop)
- Critical/Normal Queue分離
- Priority重み付きスコア計算
- 基本的なWork Stealing
- Retry機構
- Worker設定管理
- 統計情報取得
- Exponential backoff実装
- 包括的なテスト
- パフォーマンス最適化
- ドキュメント整備
- 監視・アラート機能
- コードカバレッジ: 90%以上
- 型チェック: mypy strict mode
- Linting & Format: ruff(linting + formatting統合)
- ドキュメント: Sphinx
- 5段階の細かい優先度設定
- 時間経過による動的な優先度調整
- 低優先度タスクも時間経過で処理機会を獲得
- Priority 1でも0.2倍の重みで時間経過の恩恵を受ける
- 緊急タスク(Priority 5)は即座に高スコア
- 通常タスク(Priority 3)は適度な処理順序
- 低優先度タスク(Priority 1)も適切なタイミングで処理
この設計指示書に基づいて実装を進めることで、効率的で公平性を保ったRedis FairQueueシステムを構築できます。