Skip to content

Instantly share code, notes, and snippets.

@teocns
Created December 8, 2024 15:16
Show Gist options
  • Save teocns/8adaa2aa357884a08d3dbfee0d31f629 to your computer and use it in GitHub Desktop.
Save teocns/8adaa2aa357884a08d3dbfee0d31f629 to your computer and use it in GitHub Desktop.
ControlFlow Event System

ControlFlow Event System

This document outlines the event system in ControlFlow, showing the relationships between different event types and their roles in the system.

classDiagram
    class Event {
        +str event
        +str id
        +str? thread_id
        +DateTime timestamp
        +bool persist
        +to_messages() BaseMessage
    }
    class UnpersistedEvent {
        +bool persist = False
    }
    
    %% Core Events with LangChain Message Types
    Event <|-- UnpersistedEvent
    Event <|-- AgentMessage
    note for AgentMessage "Maps to langchain_core.messages.AIMessage"
    Event <|-- AgentToolCall
    note for AgentToolCall "Uses langchain_core.messages.ToolCall"
    Event <|-- EndTurn
    Event <|-- ToolResult
    note for ToolResult "Uses langchain_core.messages.ToolMessage"
    Event <|-- UserMessage
    note for UserMessage "Maps to langchain_core.messages.HumanMessage"
    Event <|-- OrchestratorMessage
    note for OrchestratorMessage "Maps to langchain_core.messages.SystemMessage"
    
    %% Streaming and State Events
    UnpersistedEvent <|-- AgentMessageDelta
    note for AgentMessageDelta "Maps to langchain_core.messages.AIMessageChunk"
    UnpersistedEvent <|-- AgentContent
    UnpersistedEvent <|-- AgentContentDelta
    UnpersistedEvent <|-- AgentToolCallDelta
    note for AgentToolCallDelta "Uses langchain_core.messages.ToolCallChunk"
    UnpersistedEvent <|-- OrchestratorStart
    UnpersistedEvent <|-- OrchestratorEnd
    UnpersistedEvent <|-- OrchestratorError
    UnpersistedEvent <|-- AgentTurnStart
    UnpersistedEvent <|-- TaskStart
    UnpersistedEvent <|-- TaskSuccess
    UnpersistedEvent <|-- TaskFailure
    UnpersistedEvent <|-- TaskSkipped
Loading

Event Flow and LangChain Integration

flowchart TD
    User[User Input] --> UserMessage
    Agent[Agent] --> AgentMessage
    AgentMessage --> |Content| AgentContent
    AgentMessage --> |Tool Usage| AgentToolCall
    AgentToolCall --> |Execution| ToolResult
    
    subgraph LangChain["LangChain Integration"]
        direction TB
        LC_Tools[LangChain Tools]
        LC_Messages[LangChain Messages]
        LC_Models[LangChain Chat Models]
        
        AgentToolCall -.-> LC_Tools
        AgentMessage -.-> LC_Messages
        Agent -.-> LC_Models
    end
    
    subgraph Streaming["Streaming Events"]
        AgentMessage -.-> AgentMessageDelta
        AgentContent -.-> AgentContentDelta
        AgentToolCall -.-> AgentToolCallDelta
    end
    
    subgraph Task["Task Lifecycle"]
        TaskStart --> TaskSuccess
        TaskStart --> TaskFailure
        TaskStart --> TaskSkipped
    end
    
    subgraph Orchestration["Orchestration Events"]
        OrchestratorStart --> AgentTurnStart
        AgentTurnStart --> EndTurn
        EndTurn --> AgentTurnStart
        OrchestratorStart --> OrchestratorEnd
        OrchestratorStart --> OrchestratorError
    end
Loading

Event Processing Flow with LangChain Components

sequenceDiagram
    participant User
    participant Flow
    participant Orchestrator
    participant Agent
    participant LangChainTool as Tool (LangChain)
    participant LangChainModel as LLM (LangChain)
    participant History
    
    User->>Flow: Create Event
    Flow->>Orchestrator: handle_event_async()
    
    alt Is Agent Message
        Orchestrator->>Agent: Process Message
        Agent->>LangChainModel: Generate Response
        LangChainModel-->>Agent: Return Response
        
        opt Tool Usage
            Agent-->>LangChainTool: Make Tool Call
            Note right of LangChainTool: Using langchain_core.tools.BaseTool
            LangChainTool-->>Agent: Return Result
            Agent->>Orchestrator: Create Tool Result Event
        end
    end
    
    Orchestrator->>Flow: add_events()
    Flow->>History: Persist Events
Loading

Event Types and Their Roles

Persisted Events (LangChain Message Mappings)

  • AgentMessage: Core communication from agents (maps to langchain_core.messages.AIMessage)
  • AgentToolCall: Tool usage requests (uses langchain_core.messages.ToolCall)
  • EndTurn: Signals completion of agent's turn
  • ToolResult: Results from tool executions (uses langchain_core.messages.ToolMessage)
  • UserMessage: User input into the system (maps to langchain_core.messages.HumanMessage)
  • OrchestratorMessage: System messages to agents (maps to langchain_core.messages.SystemMessage)

Unpersisted Events (State Tracking)

  • AgentMessageDelta: Streaming updates (maps to langchain_core.messages.AIMessageChunk)
  • AgentContent: Content portion of agent messages
  • AgentContentDelta: Streaming updates to content
  • AgentToolCallDelta: Streaming tool call updates (uses langchain_core.messages.ToolCallChunk)
  • OrchestratorStart/End/Error: Orchestration lifecycle events
  • AgentTurnStart: Beginning of agent turns
  • TaskStart/Success/Failure/Skipped: Task lifecycle events

Event Handlers

classDiagram
    class Handler {
        +handle(event: Event)
        +on_event(event: Event)
        +on_orchestrator_start()
        +on_orchestrator_end()
        +on_orchestrator_error()
        +on_agent_message()
        +on_agent_message_delta()
        +on_tool_call()
        +on_tool_result()
        +on_orchestrator_message()
        +on_user_message()
        +on_end_turn()
    }
    
    class AsyncHandler {
        +async handle(event: Event)
    }
    
    Handler <|-- AsyncHandler
Loading

Event History

Events are stored and retrieved through the History system:

classDiagram
    class History {
        +get_events(thread_id, types, before_id, after_id, limit)
        +add_events(thread_id, events)
    }
    
    class InMemoryHistory {
        +dict[str, list[Event]] history
    }
    
    class FileHistory {
        +Path base_path
    }
    
    History <|-- InMemoryHistory
    History <|-- FileHistory
Loading

Key Features

  1. Event Persistence

    • Events marked with persist=True are stored in history
    • Events can be filtered and retrieved by type, ID, and timestamp
  2. Streaming Support

    • Delta events track incremental updates
    • Maintains snapshots for state reconstruction
  3. Tool Integration

    • Seamless integration with LangChain tools
    • Support for both synchronous and asynchronous tool execution
  4. Task Management

    • Events track complete task lifecycle
    • Support for task success, failure, and skipping
  5. Multi-Agent Orchestration

    • Turn management through EndTurn events
    • Agent coordination via orchestrator events
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment