Skip to content

Instantly share code, notes, and snippets.

@mharris717
Created July 8, 2025 14:44
Show Gist options
  • Save mharris717/315760d3ea7e00c8622dee8188d67a81 to your computer and use it in GitHub Desktop.
Save mharris717/315760d3ea7e00c8622dee8188d67a81 to your computer and use it in GitHub Desktop.

Brain System - Comprehensive Summary

Overview and Purpose

The Brain System is an intelligent event-driven personal assistant that processes incoming messages and automatically performs appropriate actions based on the content. It serves as a central coordinator for managing tasks, todos, calendar events, and various lists through AI-powered analysis of text messages, document updates, and other input sources.

The system operates on an event-driven architecture where incoming messages trigger specialized "brains" that analyze the content and determine whether to take actions like:

  • Adding or completing TODO items
  • Creating calendar events
  • Managing shopping lists
  • Processing journal entries
  • Handling various document updates

Architecture and Design

Core Design Principles

  1. Event-Driven Architecture: All system operations are triggered by events flowing through Redis pub/sub
  2. Modular Brain System: Specialized AI agents ("brains") handle different types of tasks
  3. Asynchronous Processing: Uses Prefect for workflow orchestration and async operations
  4. Persistent Storage: PostgreSQL with pgvector for embeddings and Redis for caching
  5. AI-First Approach: Leverages OpenAI and Anthropic models for natural language understanding

System Flow

Input Sources → Event Stream → Brain Router → Specialized Brains → Actions/Events
     ↓              ↓              ↓              ↓              ↓
  iMessage      Redis PubSub   Filter Logic   AI Processing   Todo Updates
  Obsidian      Event API      Brain Matching  Tool Calling   Calendar Events
  File Watch    PostgreSQL     Event Routing   LLM Analysis   List Management

Key Components

Core System Files

main.py

Entry point for data generation and testing. Contains prompt templates for generating training data for todo completion scenarios.

Key Features:

  • Structured data generation using Instructor + OpenAI
  • Pydantic models for todo/journal validation
  • Test data generation for training AI models

brain.py

Core orchestration engine - The heart of the system that coordinates all brain processing.

Key Classes:

  • OneBrain: Individual brain configuration with instructions, tools, and filtering
  • CompletedTodoMessage, AddTodoMessage, AddCalendarEventMessage: Structured action messages
  • BRAINS: Collection of active brain configurations

Key Functions:

  • handle_one_brain(): Processes single message through one brain
  • handle_one(): Routes message to all relevant brains
  • run_listen(): Main event listener that processes incoming messages

Brain Configurations:

BRAINS = [
    OneBrain(instructions_filepath="brains/allie_text_todos.md", tool_messages=[AddTodoMessage]),
    OneBrain(instructions_filepath="brains/grocery_list.md", tool_messages=[AddToListMessage]),
    OneBrain(instructions_filepath="brains/list_to_calendar.md", tool_messages=[AddCalendarEventMessage]),
    # ... more specialized brains
]

events.py

Event system foundation - Handles all event creation, retrieval, and API communication.

Key Classes:

  • GenericEvent: Protocol defining event interface
  • FetchedEvent: Concrete event implementation from API
  • PostEventResp: Response type for event creation

Key Functions:

  • post_event(): Creates new events in the system
  • get_events(): Retrieves events by source and data filters
  • get_all_events(): Retrieves complete event history
  • try_till_not_500(): Robust API communication with retry logic

schema.py

Database schema - SQLAlchemy models for persistent storage.

Key Components:

  • Event: Main event table with pgvector embeddings
  • Vector embeddings support for semantic search
  • Environment-based configuration
  • Automatic table creation

shared.py

Shared utilities - Common functionality across the system.

Key Features:

  • Prompt: Template system for AI prompts
  • ThreadPool: Concurrent processing management
  • MySimpleDebouncer: Event debouncing for file watching
  • Logging and debugging utilities
  • Date/time utilities

Message Processing

messages.py

Structured message types - Defines event data structures and message handling.

Key Classes:

  • ProcessedByBrainEvent: Tracks brain processing results
  • KeyValueEvent: Simple key-value storage events
  • InfoMessageEvent: Information/status messages
  • AssessEvent: Event assessment and validation

tools.py

AI tool integration - Handles OpenAI function calling and tool orchestration.

Key Classes:

  • MethodTool: Wraps Python functions as AI tools
  • SendEventTool: Specialized tool for creating events
  • MethodTools: Collection of tools for AI interactions

Tool Usage Example:

tools = MethodTools().register_tool(send_important_message)
tools.tools.extend([
    SendEventTool(event_class=CompletedTodoMessage, trigger_event=parent_event),
    SendEventTool(event_class=AddTodoMessage, trigger_event=parent_event),
])
response = ask_gpt(messages, tools)

Event Management

manage_events.py

Event aggregation and state management - Maintains system state through event reduction.

Key Classes:

  • ManageEvents: Central event manager with in-memory caching
  • Reducer: Functional state reduction from event streams
  • TaskProjection: Task state projection from events

Reducer Pattern:

@reducers.reg("tasks")
def task_reducer(state: list[TaskProjection], event: GenericEvent):
    if event.source == "Add Todo":
        return [*state, TaskProjection(name=event.data["task"], status="open")]
    elif event.source == "Completed Task Brain":
        # Update task status
        return updated_state

listen_events.py

Event streaming - Redis pub/sub integration and event filtering.

Key Classes:

  • RedisEvent: Event wrapper for Redis messages
  • ObsidianTodos: Obsidian-specific todo management
  • ObsidianJournal: Journal entry processing

Event Streaming:

async def listen_events():
    redis_pubsub = redis_client.pubsub()
    redis_pubsub.subscribe("events")
    while True:
        msg = redis_pubsub.get_message(timeout=999999.9)
        if msg:
            yield RedisEvent(fullRedisMsg=msg)

File System Integration

watch_file.py

File monitoring - Watches Obsidian vault for document changes.

Key Features:

  • Watchdog-based file monitoring
  • Debounced event processing
  • Obsidian markdown file integration
  • Automatic event creation for file changes

check_todos.py

Todo completion detection - AI-powered analysis of journal entries for task completion.

Key Function:

def check_if_tasks_completed(todos: list[str], journal_entry: str):
    # Uses GPT-4 to analyze if journal entry indicates task completion
    # Returns structured MarkTasksCompleted response

Brain Configurations

Individual Brain Files (in brains/)

Each brain is configured with a markdown file containing specific instructions:

  • allie_text_todos.md: Process texts between spouses for todo updates
  • grocery_list.md: Manage grocery list from text messages
  • list_to_calendar.md: Convert list items to calendar events
  • manage_my_calendar.md: Calendar event management
  • myself_todos.md: Personal todo management

Dependencies and Technology Stack

Core Dependencies (from pyproject.toml)

AI/ML Stack

  • OpenAI (^1.22.0): GPT-4 integration for natural language processing
  • Anthropic (^0.25.8): Alternative LLM support
  • Instructor (^1.2.1): Structured output from LLMs using Pydantic
  • pgvector (^0.2.5): Vector embeddings in PostgreSQL

Data & Messaging

  • Redis (^5.0.4): Pub/sub messaging and caching
  • Pydantic (^2.7.1): Data validation and serialization
  • Pendulum (^3.0.0): Advanced date/time handling

Workflow & Processing

  • Prefect (^2.18.3): Workflow orchestration and task management
  • FastAPI (^0.111.0): API framework integration
  • Watchdog (^4.0.0): File system monitoring

External Integrations

  • iMessage Reader (^0.6.1): iMessage integration
  • Inngest (^0.3.13): Event-driven functions

Development Tools

  • Textual (^0.58.1): Terminal UI framework
  • Textual-dev (^1.5.1): Development tools
  • Ruff (^0.4.4): Fast Python linter and formatter

System Architecture

graph TB
    subgraph "Input Sources"
        A[iMessage] --> E[Event Stream]
        B[Obsidian Files] --> E
        C[File Changes] --> E
        D[Manual Events] --> E
    end
    
    subgraph "Event Processing"
        E --> F[Redis PubSub]
        F --> G[Brain Router]
        G --> H[Filter Logic]
    end
    
    subgraph "AI Processing"
        H --> I[Specialized Brains]
        I --> J[OpenAI/Anthropic]
        J --> K[Tool Calling]
    end
    
    subgraph "Actions"
        K --> L[Todo Updates]
        K --> M[Calendar Events]
        K --> N[List Management]
        K --> O[Notifications]
    end
    
    subgraph "Storage"
        P[PostgreSQL] --> Q[Event History]
        P --> R[Vector Embeddings]
        S[Redis] --> T[Pub/Sub]
        S --> U[Caching]
    end
Loading

Integration Points

External System Integrations

iMessage Integration

  • Component: imessage-reader library
  • Purpose: Reads iMessage database for text processing
  • Event Source: "iMessage"
  • Data Format: Chat messages with contact info

Obsidian Integration

  • Component: File watching + markdown parsing
  • Purpose: Monitors personal knowledge base
  • Event Sources: "Obsidian"
  • Files Monitored: Todos, Journal, Notes

Stream Events API

  • Component: External FastAPI service
  • Purpose: Event persistence and retrieval
  • Base URL: http://127.0.0.1:8000
  • Endpoints: /events, /get_events, /all_events

Redis Integration

  • Component: Redis pub/sub
  • Purpose: Real-time event streaming
  • Channel: "events"
  • Configuration: Environment-based connection

Internal Integration Points

Brain-to-Brain Communication

  • Method: Event-driven messaging
  • Pattern: Publisher/subscriber
  • Coordination: Through event triggers

State Management

  • Method: Event sourcing with reducers
  • Storage: In-memory + PostgreSQL
  • Patterns: CQRS (Command Query Responsibility Segregation)

AI Model Integration

  • Primary: OpenAI GPT-4
  • Secondary: Anthropic Claude
  • Pattern: Tool calling with structured outputs
  • Validation: Pydantic models

Key Functionality

Core Features

1. Intelligent Message Processing

  • Natural Language Understanding: Processes text messages, journal entries, and document updates
  • Context Awareness: Maintains conversation and task context
  • Multi-Source Input: Handles iMessage, Obsidian, file changes, and manual events

2. Task Management

  • Auto-Detection: Identifies new tasks from conversations
  • Completion Tracking: Monitors journal entries for task completion
  • Status Updates: Maintains task state through event sourcing
# Example task processing
if event.source == "Add Todo":
    task = TaskProjection(name=event.data["task"], status="open")
elif event.source == "Completed Task Brain":
    task.status = "completed"

3. Calendar Management

  • Event Creation: Automatically creates calendar events from text
  • Time Parsing: Extracts dates/times from natural language
  • Scheduling: Manages recurring events and reminders

4. List Management

  • Dynamic Lists: Manages grocery lists, todo lists, and custom lists
  • Auto-Addition: Adds items based on conversation context
  • State Persistence: Maintains list state across sessions

5. Brain Filtering and Routing

  • Smart Routing: Each brain has custom filtering logic
  • Relevance Detection: AI-powered relevance assessment
  • Parallel Processing: Multiple brains can process same event

Advanced Features

1. Event Sourcing

  • Complete History: All system changes stored as events
  • Replay Capability: Can reconstruct state from events
  • Audit Trail: Full accountability for all actions

2. Vector Embeddings

  • Semantic Search: Find related events by meaning
  • Content Similarity: Match similar requests/tasks
  • Context Retrieval: Retrieve relevant historical context

3. Workflow Orchestration

  • Prefect Integration: Robust workflow management
  • Retry Logic: Automatic retry on failures
  • Parallel Execution: Concurrent brain processing

4. Real-time Processing

  • Live Updates: Immediate response to new events
  • Streaming: Continuous event processing
  • Debouncing: Prevents duplicate processing

Usage and Configuration

Environment Setup

Required environment variables:

DATABASE_URL=postgresql://user:pass@localhost/dbname
STREAM_EVENTS_TABLE=events
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your_password
OPENAI_API_KEY=your_key
ANTHROPIC_API_KEY=your_key

Running the System

Development Mode

# Install dependencies
poetry install

# Run brain system
python -m brain.src.plan-resolve.brain

# Run file watcher
python -m brain.src.plan-resolve.watch_file

# Run tests
pytest

Production Mode

# Use Procfile for process management
# Typically deployed with process manager like PM2 or systemd

Configuration Options

Adding New Brains

  1. Create instruction file in brains/ directory
  2. Add brain configuration to BRAINS list
  3. Define tool messages for brain actions
# Example brain configuration
OneBrain(
    instructions_filepath="brains/my_new_brain.md",
    version="v1",
    tool_messages=[MyCustomMessage],
)

Custom Message Types

  1. Inherit from MessageModel
  2. Define SOURCE and METHOD_NAME
  3. Add to brain tool messages
class MyCustomMessage(MessageModel):
    SOURCE: ClassVar[str] = "My Custom Action"
    METHOD_NAME: ClassVar[str] = "do_custom_action"
    
    description: str
    data: dict

Monitoring and Debugging

Logging System

  • Debug Logs: debug.log and debug_latest.log
  • Structured Logging: JSON format with timestamps
  • Real-time Monitoring: Live log streaming

Event Inspection

  • Event History: Query all events via API
  • State Inspection: Check current reducer states
  • Brain Performance: Monitor brain processing metrics

File Structure

brain/
├── src/plan-resolve/
│   ├── __init__.py
│   ├── brain.py                 # Main orchestration engine
│   ├── main.py                  # Entry point and data generation
│   ├── events.py                # Event system foundation
│   ├── schema.py                # Database models
│   ├── shared.py                # Common utilities
│   ├── tools.py                 # AI tool integration
│   ├── messages.py              # Message type definitions
│   ├── manage_events.py         # Event aggregation
│   ├── listen_events.py         # Event streaming
│   ├── watch_file.py            # File monitoring
│   ├── check_todos.py           # Todo completion detection
│   ├── brains/                  # Brain instruction files
│   │   ├── allie_text_todos.md
│   │   ├── grocery_list.md
│   │   ├── list_to_calendar.md
│   │   ├── manage_my_calendar.md
│   │   ├── myself_todos.md
│   │   └── ...
│   ├── info/                    # System information
│   │   ├── action_desc_prompt.md
│   │   ├── message_structure.md
│   │   └── personal_info.md
│   └── ui/                      # User interface components
├── pyproject.toml               # Dependencies and build config
├── poetry.lock                  # Dependency lock file
├── Procfile                     # Process management
├── Justfile                     # Task automation
├── debug.log                    # Debug output
├── debug_latest.log            # Latest debug output
└── README.md                   # Documentation

Key Directory Purposes

  • src/plan-resolve/: Main source code directory
  • brains/: Individual brain instruction files (markdown)
  • info/: System prompts and configuration data
  • ui/: User interface components (Textual-based)

Important Configuration Files

  • pyproject.toml: Python project configuration and dependencies
  • Procfile: Process management for deployment
  • Justfile: Task automation and development workflows
  • brain.md: Base brain instructions for all brains

Development Workflow

Adding New Functionality

  1. Define Message Types: Create new message classes in messages.py
  2. Create Brain Instructions: Add markdown files in brains/
  3. Configure Brain: Add to BRAINS list in brain.py
  4. Test Integration: Write tests in test_*.py files
  5. Deploy: Update configuration and restart services

Testing Strategy

  • Unit Tests: Test individual components
  • Integration Tests: Test brain-to-brain communication
  • End-to-End Tests: Test complete workflows
  • AI Testing: Validate AI responses and tool calling

Monitoring and Observability

  • Event Tracking: All actions logged as events
  • Performance Metrics: Brain processing times
  • Error Handling: Comprehensive error logging
  • State Monitoring: Real-time state inspection

The Brain System represents a sophisticated approach to personal automation, combining modern AI capabilities with robust event-driven architecture to create an intelligent, responsive personal assistant that learns and adapts to user needs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment