The Brain System is an intelligent event-driven personal assistant that processes incoming messages and automatically performs appropriate actions based on the content. It serves as a central coordinator for managing tasks, todos, calendar events, and various lists through AI-powered analysis of text messages, document updates, and other input sources.
The system operates on an event-driven architecture where incoming messages trigger specialized "brains" that analyze the content and determine whether to take actions like:
- Adding or completing TODO items
- Creating calendar events
- Managing shopping lists
- Processing journal entries
- Handling various document updates
- Event-Driven Architecture: All system operations are triggered by events flowing through Redis pub/sub
- Modular Brain System: Specialized AI agents ("brains") handle different types of tasks
- Asynchronous Processing: Uses Prefect for workflow orchestration and async operations
- Persistent Storage: PostgreSQL with pgvector for embeddings and Redis for caching
- AI-First Approach: Leverages OpenAI and Anthropic models for natural language understanding
Input Sources → Event Stream → Brain Router → Specialized Brains → Actions/Events
↓ ↓ ↓ ↓ ↓
iMessage Redis PubSub Filter Logic AI Processing Todo Updates
Obsidian Event API Brain Matching Tool Calling Calendar Events
File Watch PostgreSQL Event Routing LLM Analysis List Management
Entry point for data generation and testing. Contains prompt templates for generating training data for todo completion scenarios.
Key Features:
- Structured data generation using Instructor + OpenAI
- Pydantic models for todo/journal validation
- Test data generation for training AI models
Core orchestration engine - The heart of the system that coordinates all brain processing.
Key Classes:
OneBrain
: Individual brain configuration with instructions, tools, and filteringCompletedTodoMessage
,AddTodoMessage
,AddCalendarEventMessage
: Structured action messagesBRAINS
: Collection of active brain configurations
Key Functions:
handle_one_brain()
: Processes single message through one brainhandle_one()
: Routes message to all relevant brainsrun_listen()
: Main event listener that processes incoming messages
Brain Configurations:
BRAINS = [
OneBrain(instructions_filepath="brains/allie_text_todos.md", tool_messages=[AddTodoMessage]),
OneBrain(instructions_filepath="brains/grocery_list.md", tool_messages=[AddToListMessage]),
OneBrain(instructions_filepath="brains/list_to_calendar.md", tool_messages=[AddCalendarEventMessage]),
# ... more specialized brains
]
Event system foundation - Handles all event creation, retrieval, and API communication.
Key Classes:
GenericEvent
: Protocol defining event interfaceFetchedEvent
: Concrete event implementation from APIPostEventResp
: Response type for event creation
Key Functions:
post_event()
: Creates new events in the systemget_events()
: Retrieves events by source and data filtersget_all_events()
: Retrieves complete event historytry_till_not_500()
: Robust API communication with retry logic
Database schema - SQLAlchemy models for persistent storage.
Key Components:
Event
: Main event table with pgvector embeddings- Vector embeddings support for semantic search
- Environment-based configuration
- Automatic table creation
Shared utilities - Common functionality across the system.
Key Features:
Prompt
: Template system for AI promptsThreadPool
: Concurrent processing managementMySimpleDebouncer
: Event debouncing for file watching- Logging and debugging utilities
- Date/time utilities
Structured message types - Defines event data structures and message handling.
Key Classes:
ProcessedByBrainEvent
: Tracks brain processing resultsKeyValueEvent
: Simple key-value storage eventsInfoMessageEvent
: Information/status messagesAssessEvent
: Event assessment and validation
AI tool integration - Handles OpenAI function calling and tool orchestration.
Key Classes:
MethodTool
: Wraps Python functions as AI toolsSendEventTool
: Specialized tool for creating eventsMethodTools
: Collection of tools for AI interactions
Tool Usage Example:
tools = MethodTools().register_tool(send_important_message)
tools.tools.extend([
SendEventTool(event_class=CompletedTodoMessage, trigger_event=parent_event),
SendEventTool(event_class=AddTodoMessage, trigger_event=parent_event),
])
response = ask_gpt(messages, tools)
Event aggregation and state management - Maintains system state through event reduction.
Key Classes:
ManageEvents
: Central event manager with in-memory cachingReducer
: Functional state reduction from event streamsTaskProjection
: Task state projection from events
Reducer Pattern:
@reducers.reg("tasks")
def task_reducer(state: list[TaskProjection], event: GenericEvent):
if event.source == "Add Todo":
return [*state, TaskProjection(name=event.data["task"], status="open")]
elif event.source == "Completed Task Brain":
# Update task status
return updated_state
Event streaming - Redis pub/sub integration and event filtering.
Key Classes:
RedisEvent
: Event wrapper for Redis messagesObsidianTodos
: Obsidian-specific todo managementObsidianJournal
: Journal entry processing
Event Streaming:
async def listen_events():
redis_pubsub = redis_client.pubsub()
redis_pubsub.subscribe("events")
while True:
msg = redis_pubsub.get_message(timeout=999999.9)
if msg:
yield RedisEvent(fullRedisMsg=msg)
File monitoring - Watches Obsidian vault for document changes.
Key Features:
- Watchdog-based file monitoring
- Debounced event processing
- Obsidian markdown file integration
- Automatic event creation for file changes
Todo completion detection - AI-powered analysis of journal entries for task completion.
Key Function:
def check_if_tasks_completed(todos: list[str], journal_entry: str):
# Uses GPT-4 to analyze if journal entry indicates task completion
# Returns structured MarkTasksCompleted response
Each brain is configured with a markdown file containing specific instructions:
allie_text_todos.md
: Process texts between spouses for todo updatesgrocery_list.md
: Manage grocery list from text messageslist_to_calendar.md
: Convert list items to calendar eventsmanage_my_calendar.md
: Calendar event managementmyself_todos.md
: Personal todo management
- OpenAI (^1.22.0): GPT-4 integration for natural language processing
- Anthropic (^0.25.8): Alternative LLM support
- Instructor (^1.2.1): Structured output from LLMs using Pydantic
- pgvector (^0.2.5): Vector embeddings in PostgreSQL
- Redis (^5.0.4): Pub/sub messaging and caching
- Pydantic (^2.7.1): Data validation and serialization
- Pendulum (^3.0.0): Advanced date/time handling
- Prefect (^2.18.3): Workflow orchestration and task management
- FastAPI (^0.111.0): API framework integration
- Watchdog (^4.0.0): File system monitoring
- iMessage Reader (^0.6.1): iMessage integration
- Inngest (^0.3.13): Event-driven functions
- Textual (^0.58.1): Terminal UI framework
- Textual-dev (^1.5.1): Development tools
- Ruff (^0.4.4): Fast Python linter and formatter
graph TB
subgraph "Input Sources"
A[iMessage] --> E[Event Stream]
B[Obsidian Files] --> E
C[File Changes] --> E
D[Manual Events] --> E
end
subgraph "Event Processing"
E --> F[Redis PubSub]
F --> G[Brain Router]
G --> H[Filter Logic]
end
subgraph "AI Processing"
H --> I[Specialized Brains]
I --> J[OpenAI/Anthropic]
J --> K[Tool Calling]
end
subgraph "Actions"
K --> L[Todo Updates]
K --> M[Calendar Events]
K --> N[List Management]
K --> O[Notifications]
end
subgraph "Storage"
P[PostgreSQL] --> Q[Event History]
P --> R[Vector Embeddings]
S[Redis] --> T[Pub/Sub]
S --> U[Caching]
end
- Component:
imessage-reader
library - Purpose: Reads iMessage database for text processing
- Event Source: "iMessage"
- Data Format: Chat messages with contact info
- Component: File watching + markdown parsing
- Purpose: Monitors personal knowledge base
- Event Sources: "Obsidian"
- Files Monitored: Todos, Journal, Notes
- Component: External FastAPI service
- Purpose: Event persistence and retrieval
- Base URL:
http://127.0.0.1:8000
- Endpoints:
/events
,/get_events
,/all_events
- Component: Redis pub/sub
- Purpose: Real-time event streaming
- Channel: "events"
- Configuration: Environment-based connection
- Method: Event-driven messaging
- Pattern: Publisher/subscriber
- Coordination: Through event triggers
- Method: Event sourcing with reducers
- Storage: In-memory + PostgreSQL
- Patterns: CQRS (Command Query Responsibility Segregation)
- Primary: OpenAI GPT-4
- Secondary: Anthropic Claude
- Pattern: Tool calling with structured outputs
- Validation: Pydantic models
- Natural Language Understanding: Processes text messages, journal entries, and document updates
- Context Awareness: Maintains conversation and task context
- Multi-Source Input: Handles iMessage, Obsidian, file changes, and manual events
- Auto-Detection: Identifies new tasks from conversations
- Completion Tracking: Monitors journal entries for task completion
- Status Updates: Maintains task state through event sourcing
# Example task processing
if event.source == "Add Todo":
task = TaskProjection(name=event.data["task"], status="open")
elif event.source == "Completed Task Brain":
task.status = "completed"
- Event Creation: Automatically creates calendar events from text
- Time Parsing: Extracts dates/times from natural language
- Scheduling: Manages recurring events and reminders
- Dynamic Lists: Manages grocery lists, todo lists, and custom lists
- Auto-Addition: Adds items based on conversation context
- State Persistence: Maintains list state across sessions
- Smart Routing: Each brain has custom filtering logic
- Relevance Detection: AI-powered relevance assessment
- Parallel Processing: Multiple brains can process same event
- Complete History: All system changes stored as events
- Replay Capability: Can reconstruct state from events
- Audit Trail: Full accountability for all actions
- Semantic Search: Find related events by meaning
- Content Similarity: Match similar requests/tasks
- Context Retrieval: Retrieve relevant historical context
- Prefect Integration: Robust workflow management
- Retry Logic: Automatic retry on failures
- Parallel Execution: Concurrent brain processing
- Live Updates: Immediate response to new events
- Streaming: Continuous event processing
- Debouncing: Prevents duplicate processing
Required environment variables:
DATABASE_URL=postgresql://user:pass@localhost/dbname
STREAM_EVENTS_TABLE=events
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your_password
OPENAI_API_KEY=your_key
ANTHROPIC_API_KEY=your_key
# Install dependencies
poetry install
# Run brain system
python -m brain.src.plan-resolve.brain
# Run file watcher
python -m brain.src.plan-resolve.watch_file
# Run tests
pytest
# Use Procfile for process management
# Typically deployed with process manager like PM2 or systemd
- Create instruction file in
brains/
directory - Add brain configuration to
BRAINS
list - Define tool messages for brain actions
# Example brain configuration
OneBrain(
instructions_filepath="brains/my_new_brain.md",
version="v1",
tool_messages=[MyCustomMessage],
)
- Inherit from
MessageModel
- Define
SOURCE
andMETHOD_NAME
- Add to brain tool messages
class MyCustomMessage(MessageModel):
SOURCE: ClassVar[str] = "My Custom Action"
METHOD_NAME: ClassVar[str] = "do_custom_action"
description: str
data: dict
- Debug Logs:
debug.log
anddebug_latest.log
- Structured Logging: JSON format with timestamps
- Real-time Monitoring: Live log streaming
- Event History: Query all events via API
- State Inspection: Check current reducer states
- Brain Performance: Monitor brain processing metrics
brain/
├── src/plan-resolve/
│ ├── __init__.py
│ ├── brain.py # Main orchestration engine
│ ├── main.py # Entry point and data generation
│ ├── events.py # Event system foundation
│ ├── schema.py # Database models
│ ├── shared.py # Common utilities
│ ├── tools.py # AI tool integration
│ ├── messages.py # Message type definitions
│ ├── manage_events.py # Event aggregation
│ ├── listen_events.py # Event streaming
│ ├── watch_file.py # File monitoring
│ ├── check_todos.py # Todo completion detection
│ ├── brains/ # Brain instruction files
│ │ ├── allie_text_todos.md
│ │ ├── grocery_list.md
│ │ ├── list_to_calendar.md
│ │ ├── manage_my_calendar.md
│ │ ├── myself_todos.md
│ │ └── ...
│ ├── info/ # System information
│ │ ├── action_desc_prompt.md
│ │ ├── message_structure.md
│ │ └── personal_info.md
│ └── ui/ # User interface components
├── pyproject.toml # Dependencies and build config
├── poetry.lock # Dependency lock file
├── Procfile # Process management
├── Justfile # Task automation
├── debug.log # Debug output
├── debug_latest.log # Latest debug output
└── README.md # Documentation
src/plan-resolve/
: Main source code directorybrains/
: Individual brain instruction files (markdown)info/
: System prompts and configuration dataui/
: User interface components (Textual-based)
pyproject.toml
: Python project configuration and dependenciesProcfile
: Process management for deploymentJustfile
: Task automation and development workflowsbrain.md
: Base brain instructions for all brains
- Define Message Types: Create new message classes in
messages.py
- Create Brain Instructions: Add markdown files in
brains/
- Configure Brain: Add to
BRAINS
list inbrain.py
- Test Integration: Write tests in
test_*.py
files - Deploy: Update configuration and restart services
- Unit Tests: Test individual components
- Integration Tests: Test brain-to-brain communication
- End-to-End Tests: Test complete workflows
- AI Testing: Validate AI responses and tool calling
- Event Tracking: All actions logged as events
- Performance Metrics: Brain processing times
- Error Handling: Comprehensive error logging
- State Monitoring: Real-time state inspection
The Brain System represents a sophisticated approach to personal automation, combining modern AI capabilities with robust event-driven architecture to create an intelligent, responsive personal assistant that learns and adapts to user needs.