This document outlines the event system in ControlFlow, showing the relationships between different event types and their roles in the system.
classDiagram
class Event {
+str event
+str id
+str? thread_id
+DateTime timestamp
+bool persist
+to_messages() BaseMessage
}
class UnpersistedEvent {
+bool persist = False
}
%% Core Events with LangChain Message Types
Event <|-- UnpersistedEvent
Event <|-- AgentMessage
note for AgentMessage "Maps to langchain_core.messages.AIMessage"
Event <|-- AgentToolCall
note for AgentToolCall "Uses langchain_core.messages.ToolCall"
Event <|-- EndTurn
Event <|-- ToolResult
note for ToolResult "Uses langchain_core.messages.ToolMessage"
Event <|-- UserMessage
note for UserMessage "Maps to langchain_core.messages.HumanMessage"
Event <|-- OrchestratorMessage
note for OrchestratorMessage "Maps to langchain_core.messages.SystemMessage"
%% Streaming and State Events
UnpersistedEvent <|-- AgentMessageDelta
note for AgentMessageDelta "Maps to langchain_core.messages.AIMessageChunk"
UnpersistedEvent <|-- AgentContent
UnpersistedEvent <|-- AgentContentDelta
UnpersistedEvent <|-- AgentToolCallDelta
note for AgentToolCallDelta "Uses langchain_core.messages.ToolCallChunk"
UnpersistedEvent <|-- OrchestratorStart
UnpersistedEvent <|-- OrchestratorEnd
UnpersistedEvent <|-- OrchestratorError
UnpersistedEvent <|-- AgentTurnStart
UnpersistedEvent <|-- TaskStart
UnpersistedEvent <|-- TaskSuccess
UnpersistedEvent <|-- TaskFailure
UnpersistedEvent <|-- TaskSkipped
flowchart TD
User[User Input] --> UserMessage
Agent[Agent] --> AgentMessage
AgentMessage --> |Content| AgentContent
AgentMessage --> |Tool Usage| AgentToolCall
AgentToolCall --> |Execution| ToolResult
subgraph LangChain["LangChain Integration"]
direction TB
LC_Tools[LangChain Tools]
LC_Messages[LangChain Messages]
LC_Models[LangChain Chat Models]
AgentToolCall -.-> LC_Tools
AgentMessage -.-> LC_Messages
Agent -.-> LC_Models
end
subgraph Streaming["Streaming Events"]
AgentMessage -.-> AgentMessageDelta
AgentContent -.-> AgentContentDelta
AgentToolCall -.-> AgentToolCallDelta
end
subgraph Task["Task Lifecycle"]
TaskStart --> TaskSuccess
TaskStart --> TaskFailure
TaskStart --> TaskSkipped
end
subgraph Orchestration["Orchestration Events"]
OrchestratorStart --> AgentTurnStart
AgentTurnStart --> EndTurn
EndTurn --> AgentTurnStart
OrchestratorStart --> OrchestratorEnd
OrchestratorStart --> OrchestratorError
end
sequenceDiagram
participant User
participant Flow
participant Orchestrator
participant Agent
participant LangChainTool as Tool (LangChain)
participant LangChainModel as LLM (LangChain)
participant History
User->>Flow: Create Event
Flow->>Orchestrator: handle_event_async()
alt Is Agent Message
Orchestrator->>Agent: Process Message
Agent->>LangChainModel: Generate Response
LangChainModel-->>Agent: Return Response
opt Tool Usage
Agent-->>LangChainTool: Make Tool Call
Note right of LangChainTool: Using langchain_core.tools.BaseTool
LangChainTool-->>Agent: Return Result
Agent->>Orchestrator: Create Tool Result Event
end
end
Orchestrator->>Flow: add_events()
Flow->>History: Persist Events
- AgentMessage: Core communication from agents (maps to
langchain_core.messages.AIMessage
) - AgentToolCall: Tool usage requests (uses
langchain_core.messages.ToolCall
) - EndTurn: Signals completion of agent's turn
- ToolResult: Results from tool executions (uses
langchain_core.messages.ToolMessage
) - UserMessage: User input into the system (maps to
langchain_core.messages.HumanMessage
) - OrchestratorMessage: System messages to agents (maps to
langchain_core.messages.SystemMessage
)
- AgentMessageDelta: Streaming updates (maps to
langchain_core.messages.AIMessageChunk
) - AgentContent: Content portion of agent messages
- AgentContentDelta: Streaming updates to content
- AgentToolCallDelta: Streaming tool call updates (uses
langchain_core.messages.ToolCallChunk
) - OrchestratorStart/End/Error: Orchestration lifecycle events
- AgentTurnStart: Beginning of agent turns
- TaskStart/Success/Failure/Skipped: Task lifecycle events
classDiagram
class Handler {
+handle(event: Event)
+on_event(event: Event)
+on_orchestrator_start()
+on_orchestrator_end()
+on_orchestrator_error()
+on_agent_message()
+on_agent_message_delta()
+on_tool_call()
+on_tool_result()
+on_orchestrator_message()
+on_user_message()
+on_end_turn()
}
class AsyncHandler {
+async handle(event: Event)
}
Handler <|-- AsyncHandler
Events are stored and retrieved through the History system:
classDiagram
class History {
+get_events(thread_id, types, before_id, after_id, limit)
+add_events(thread_id, events)
}
class InMemoryHistory {
+dict[str, list[Event]] history
}
class FileHistory {
+Path base_path
}
History <|-- InMemoryHistory
History <|-- FileHistory
-
Event Persistence
- Events marked with
persist=True
are stored in history - Events can be filtered and retrieved by type, ID, and timestamp
- Events marked with
-
Streaming Support
- Delta events track incremental updates
- Maintains snapshots for state reconstruction
-
Tool Integration
- Seamless integration with LangChain tools
- Support for both synchronous and asynchronous tool execution
-
Task Management
- Events track complete task lifecycle
- Support for task success, failure, and skipping
-
Multi-Agent Orchestration
- Turn management through EndTurn events
- Agent coordination via orchestrator events