How Decision AI can leverage LangMem for intelligent memory management across voice sessions, Decision Packs, and multi-agent orchestration
LangMem provides a production-ready foundation for the memory layer Decision AI needs but hasn't yet built. Instead of implementing memory extraction, storage, and retrieval from scratch, LangMem offers:
- Memory Manager - Extracts semantic, episodic, and procedural memories from conversations
- Store Manager - Automatically persists memories to any BaseStore (including AsyncPostgresStore/Supabase)
- Memory Tools - Allows agents to consciously manage their own memories
- Summarization - Handles long context through intelligent summarization
- Prompt Optimizer - Refines system prompts based on conversation feedback
- Namespace Scoping - Hierarchical memory organization by user/pack/session
LangMem covers ~70% of Decision AI's memory needs out of the box:
| Need | LangMem Coverage | Gap |
|---|---|---|
| Memory extraction from conversations | Full coverage | - |
| Semantic search over memories | Full coverage | - |
| User preference tracking | Full coverage | - |
| Thread/session summarization | Full coverage | - |
| Namespace-based scoping | Full coverage | - |
| Custom extraction schemas | Full coverage | - |
| Voice session context | Partial | Need streaming-aware extraction |
| Multi-agent memory bridge | Partial | Need custom bridging layer |
| Decision outcome tracking | Not covered | Need custom implementation |
| Pack-level shared memory | Partial | Need permission model |
| Hot-path voice latency | Partial | Need optimization layer |
Recommendation: Adopt LangMem as the foundation, build ~30% custom on top.
The core extraction engine that processes conversations and generates structured memories:
from langmem import create_memory_manager
from pydantic import BaseModel
class DecisionMemory(BaseModel):
"""Custom schema for Decision AI memories."""
content: str
decision_type: str # 'preference', 'fact', 'outcome', 'pattern'
confidence: float
source_context: str | None = None
manager = create_memory_manager(
"anthropic:claude-3-5-sonnet-latest",
schemas=[DecisionMemory],
instructions="""Extract user preferences, decisions made,
and patterns from conversations. Note confidence levels.""",
enable_inserts=True,
enable_updates=True,
enable_deletes=True,
)
# Extract memories from conversation
memories = manager.invoke({
"messages": conversation,
"existing": previous_memories # For update/consolidation
})Key features:
- Custom Pydantic schemas for structured extraction
- Automatic deduplication and consolidation
- Update/delete existing memories when information changes
- Works with any LLM provider
Handles persistence automatically with LangGraph's BaseStore:
from langmem import create_memory_store_manager
from langgraph.store.postgres import AsyncPostgresStore
# Use Supabase as the backing store
store = AsyncPostgresStore(
connection_string=os.getenv("SUPABASE_DB_URL"),
index={
"dims": 1536,
"embed": "openai:text-embedding-3-small",
}
)
memory_manager = create_memory_store_manager(
"anthropic:claude-3-5-sonnet-latest",
namespace=("decision_ai", "{user_id}", "memories"),
store=store,
query_limit=10,
)Allow agents to consciously manage their own memories:
from langmem import create_manage_memory_tool, create_search_memory_tool
tools = [
create_manage_memory_tool(
namespace=("decision_ai", "{user_id}", "memories")
),
create_search_memory_tool(
namespace=("decision_ai", "{user_id}", "memories")
),
]
# Agent can now call:
# - manage_memory(action="insert", content="User prefers dark mode")
# - search_memory(query="user preferences")Manages long context through intelligent summarization:
from langmem.short_term import SummarizationNode, summarize_messages
summarization_node = SummarizationNode(
token_counter=model.get_num_tokens_from_messages,
model=summarization_model,
max_tokens=4096,
max_tokens_before_summary=8192,
max_summary_tokens=1024,
)Refines system prompts based on conversation outcomes:
from langmem import create_prompt_optimizer
optimizer = create_prompt_optimizer(
"anthropic:claude-3-5-sonnet-latest",
kind="metaprompt",
config={"max_reflection_steps": 3}
)
# Improve prompt based on user feedback
optimized = optimizer.invoke({
"trajectories": [(conversation, {"user_score": 0.3})],
"prompt": current_system_prompt
})Background memory processing with debouncing:
from langmem import ReflectionExecutor
executor = ReflectionExecutor(memory_manager)
# Defer processing until conversation settles
executor.submit(
{"messages": conversation},
after_seconds=300 # Wait 5 minutes for activity to settle
)LangMem processes complete conversations, but voice sessions need streaming-aware memory extraction:
class VoiceContextManager:
"""Custom layer for voice session memory management."""
def __init__(self, memory_manager: MemoryStoreManager):
self.memory_manager = memory_manager
self.pending_utterances: list = []
self.last_extraction_time: datetime = None
async def on_utterance(self, utterance: dict):
"""Handle real-time voice utterances."""
self.pending_utterances.append(utterance)
# Don't extract on every utterance - batch them
if self._should_extract():
await self._extract_memories()
def _should_extract(self) -> bool:
"""Determine if we should trigger extraction."""
# Extract after silence, after N utterances, or after time threshold
return (
len(self.pending_utterances) >= 10 or
self._silence_detected() or
self._time_threshold_exceeded()
)
async def _extract_memories(self):
"""Extract memories from buffered utterances."""
messages = self._format_as_messages(self.pending_utterances)
# Use background extraction to not block voice
await self.memory_manager.ainvoke({"messages": messages})
self.pending_utterances = []
self.last_extraction_time = datetime.now()Why custom: Voice has unique requirements:
- Can't wait for conversation "end" - it's continuous
- Utterances arrive in real-time
- Must not add latency to Fast Agent responses
- Need to coordinate with Supervisor Loop timing
Decision AI's 3-Claude voice architecture needs memory bridging:
class MultiAgentMemoryBridge:
"""Bridge memories between Fast Agent, Supervisor, and Session."""
def __init__(
self,
store: BaseStore,
user_id: str,
session_id: str
):
self.store = store
self.user_id = user_id
self.session_id = session_id
# Different namespaces for different agents
self.namespaces = {
"user": ("decision_ai", user_id, "user_memories"),
"session": ("decision_ai", user_id, session_id, "session_memories"),
"supervisor": ("decision_ai", user_id, session_id, "supervisor_context"),
}
async def get_fast_agent_context(self) -> str:
"""Get minimal context for Fast Agent (latency-critical)."""
# Only retrieve high-priority user memories
user_memories = await self.store.asearch(
self.namespaces["user"],
query="user preferences and important facts",
limit=5,
filter={"priority": "high"}
)
return self._format_for_fast_agent(user_memories)
async def get_supervisor_context(self) -> list:
"""Get full context for Supervisor (depth-critical)."""
# Retrieve all relevant memories across namespaces
results = await asyncio.gather(
self.store.asearch(self.namespaces["user"], limit=20),
self.store.asearch(self.namespaces["session"], limit=20),
)
return self._merge_results(results)
async def promote_session_memory(self, memory_id: str):
"""Promote a session memory to user-level (persists across sessions)."""
session_mem = await self.store.aget(
self.namespaces["session"],
memory_id
)
if session_mem:
await self.store.aput(
self.namespaces["user"],
key=f"promoted_{memory_id}",
value=session_mem.value
)Why custom: LangMem doesn't know about our multi-agent architecture:
- Different agents need different memory views
- Memory promotion (session to user) is Decision AI specific
- Latency requirements differ by agent role
Track decisions and their outcomes for learning:
class DecisionOutcomeTracker:
"""Track decisions, outcomes, and learn patterns."""
def __init__(self, memory_manager: MemoryStoreManager):
self.memory_manager = memory_manager
self.pending_decisions: dict[str, PendingDecision] = {}
async def record_decision(
self,
decision_id: str,
context: str,
options_presented: list[str],
option_chosen: str,
rationale: str | None = None
):
"""Record a decision that was made."""
decision = PendingDecision(
id=decision_id,
context=context,
options=options_presented,
chosen=option_chosen,
rationale=rationale,
timestamp=datetime.now()
)
self.pending_decisions[decision_id] = decision
# Store as memory
await self.memory_manager.aput(
key=f"decision_{decision_id}",
value={
"kind": "Decision",
"content": {
"context": context,
"chosen": option_chosen,
"rationale": rationale,
"outcome": "pending"
}
}
)
async def record_outcome(
self,
decision_id: str,
outcome: str,
satisfaction: float, # 0-1
learnings: str | None = None
):
"""Record the outcome of a previous decision."""
if decision_id in self.pending_decisions:
decision = self.pending_decisions.pop(decision_id)
# Update the decision memory with outcome
existing = await self.memory_manager.aget(f"decision_{decision_id}")
if existing:
existing.value["content"]["outcome"] = outcome
existing.value["content"]["satisfaction"] = satisfaction
existing.value["content"]["learnings"] = learnings
await self.memory_manager.aput(
key=f"decision_{decision_id}",
value=existing.value
)
# Extract patterns if low satisfaction
if satisfaction < 0.5:
await self._extract_failure_pattern(decision, outcome)
async def _extract_failure_pattern(self, decision, outcome):
"""Extract patterns from failed decisions for future avoidance."""
# Use LLM to extract learnings
pattern = await self._analyze_failure(decision, outcome)
await self.memory_manager.aput(
key=f"pattern_{uuid4()}",
value={
"kind": "Pattern",
"content": {
"type": "failure_pattern",
"context": decision.context,
"pattern": pattern,
"avoid_in_future": True
}
}
)Why custom: LangMem extracts memories but doesn't track decision outcomes:
- We need to link decisions to results
- Learning from failures is Decision AI core functionality
- Pattern extraction from outcomes is domain-specific
Adapt summarization for voice streaming:
class StreamingSummarizer:
"""Summarization that works with voice streaming."""
def __init__(self, base_summarizer: SummarizationNode):
self.base_summarizer = base_summarizer
self.running_summary: RunningSummary | None = None
self.buffer: list = []
async def on_message(self, message: dict) -> str | None:
"""Process incoming message, return summary if triggered."""
self.buffer.append(message)
# Check if we need to summarize
if self._should_summarize():
result = await self.base_summarizer.ainvoke({
"messages": self.buffer,
"running_summary": self.running_summary
})
self.running_summary = result.running_summary
self.buffer = result.remaining_messages
return result.summary
return None
def get_context_for_fast_agent(self) -> str:
"""Get current summary + recent buffer for Fast Agent."""
summary_text = self.running_summary.summary if self.running_summary else ""
recent = self._format_recent_messages(self.buffer[-5:])
return f"""## Summary
{summary_text}
## Recent Context
{recent}"""Why custom: Standard summarization waits for "end" of conversation:
- Voice sessions are continuous - no clear end
- Need incremental summarization
- Must coordinate with Fast Agent context updates
Load Decision Pack memories into session context:
class PackMemoryLoader:
"""Load and manage Decision Pack shared memories."""
def __init__(self, store: BaseStore):
self.store = store
async def load_pack_context(
self,
pack_id: str,
user_id: str,
query: str | None = None
) -> PackContext:
"""Load pack-level and user-level memories for a session."""
# Pack-level memories (shared across all users of this pack)
pack_memories = await self.store.asearch(
("packs", pack_id, "shared_memories"),
query=query,
limit=10
)
# User's memories within this pack
user_pack_memories = await self.store.asearch(
("packs", pack_id, "users", user_id, "memories"),
query=query,
limit=10
)
# Pack's learned patterns (from all users, anonymized)
pack_patterns = await self.store.asearch(
("packs", pack_id, "patterns"),
query=query,
limit=5
)
return PackContext(
pack_memories=pack_memories,
user_memories=user_pack_memories,
patterns=pack_patterns
)
async def contribute_to_pack(
self,
pack_id: str,
memory: dict,
anonymize: bool = True
):
"""Contribute a memory back to the pack's shared knowledge."""
if anonymize:
memory = self._anonymize_memory(memory)
await self.store.aput(
("packs", pack_id, "shared_memories"),
key=str(uuid4()),
value=memory
)Why custom: Decision Packs have unique memory requirements:
- Pack-level shared memories (templates, patterns)
- User-specific memories within pack context
- Memory contribution/learning from usage
- Permission model for shared vs private
+-------------------------------------------------------------------------------------+
| LANGMEM INTEGRATION ARCHITECTURE |
+-------------------------------------------------------------------------------------+
| |
| LANGMEM CORE |
| +-----------------------------------------------------------------------+ |
| | | |
| | +--------------+ +--------------+ +--------------+ +--------------+ |
| | | Memory | | Store | | Memory | | Prompt | |
| | | Manager | | Manager | | Tools | | Optimizer | |
| | +------+-------+ +------+-------+ +------+-------+ +------+-------+ |
| | | | | | |
| | +-----------------+-----------------+-----------------+ |
| | | | |
| +---------------------------+-----------------+------------------------------------+
| | | |
| ==========================================================================================
| |
| CUSTOM DECISION AI LAYER |
| +-----------------------------------------------------------------------+ |
| | | |
| | +-----------------+ +-----------------+ +-----------------+ | |
| | | Voice Context | | Multi-Agent | | Decision | | |
| | | Manager | | Memory Bridge | | Outcome Tracker | | |
| | | | | | | | | |
| | | * Streaming | | * Fast Agent | | * Record | | |
| | | extraction | | context | | decisions | | |
| | | * Utterance | | * Supervisor | | * Track | | |
| | | batching | | context | | outcomes | | |
| | | * Latency | | * Memory | | * Extract | | |
| | | optimization | | promotion | | patterns | | |
| | +-----------------+ +-----------------+ +-----------------+ | |
| | | |
| | +-----------------+ +-----------------+ | |
| | | Streaming | | Pack Memory | | |
| | | Summarizer | | Loader | | |
| | | | | | | |
| | | * Incremental | | * Pack-level | | |
| | | summaries | | memories | | |
| | | * Voice-aware | | * User-pack | | |
| | | thresholds | | memories | | |
| | +-----------------+ | * Contribute | | |
| | | back | | |
| | +-----------------+ | |
| +-----------------------------------------------------------------------+ |
| |
| ==========================================================================================
| |
| ASYNCPOSTGRESSTORE (SUPABASE) |
| +-----------------------------------------------------------------------+ |
| | | |
| | +-------------------------------------------------------------+ | |
| | | NAMESPACE HIERARCHY | | |
| | | | | |
| | | decision_ai/ | | |
| | | +-- {user_id}/ | | |
| | | | +-- memories/ # User-level persistent | | |
| | | | +-- preferences/ # User preferences | | |
| | | | +-- patterns/ # Learned user patterns | | |
| | | | +-- {session_id}/ # Session-specific | | |
| | | | +-- session_memories/ | | |
| | | | +-- supervisor_context/ | | |
| | | | | | |
| | | +-- packs/ | | |
| | | +-- {pack_id}/ | | |
| | | +-- shared_memories/ # Pack-level knowledge | | |
| | | +-- patterns/ # Pack-learned patterns | | |
| | | +-- users/ # Per-user within pack | | |
| | | +-- {user_id}/ | | |
| | | +-- memories/ | | |
| | | | | |
| | +-------------------------------------------------------------+ | |
| | | |
| | pgvector extension for semantic search | |
| | Automatic embedding on insert | |
| | | |
| +-----------------------------------------------------------------------+ |
| |
+-------------------------------------------------------------------------------------+
LangMem supports three memory types that map to Decision AI needs:
| Memory Type | Human Analogy | Decision AI Use Case |
|---|---|---|
| Semantic | Facts & Knowledge | User preferences, domain knowledge, pack patterns |
| Episodic | Past Experiences | Previous decisions, conversation history, outcomes |
| Procedural | How to do things | System prompts, agent behaviors, learned procedures |
class UserPreference(BaseModel):
"""Semantic memory: User preferences."""
category: str # 'communication', 'analysis', 'data_format'
preference: str
strength: float # How strongly they prefer this
learned_from: str # Where we learned this
class DomainKnowledge(BaseModel):
"""Semantic memory: Domain knowledge from packs."""
domain: str # 'mmm', 'financial_planning', etc.
fact: str
confidence: float
source: strclass DecisionEpisode(BaseModel):
"""Episodic memory: A decision that was made."""
context: str
options: list[str]
chosen: str
outcome: str | None
satisfaction: float | None
timestamp: datetime
class ConversationSummary(BaseModel):
"""Episodic memory: Summary of a conversation."""
session_id: str
main_topics: list[str]
key_decisions: list[str]
action_items: list[str]
user_sentiment: strclass AgentBehavior(BaseModel):
"""Procedural memory: How agents should behave."""
trigger: str # When this applies
behavior: str # What to do
reasoning: str # Why
learned_from: list[str] # Feedback that shaped thisLangMem namespaces enable Decision AI's permission model:
# User-level memories (persist across sessions)
USER_NAMESPACE = ("decision_ai", "{user_id}", "memories")
# Session-level memories (ephemeral, within single session)
SESSION_NAMESPACE = ("decision_ai", "{user_id}", "{session_id}", "session_memories")
# Pack-level shared (all users of this pack)
PACK_SHARED_NAMESPACE = ("packs", "{pack_id}", "shared_memories")
# User within pack (user's experience with this specific pack)
PACK_USER_NAMESPACE = ("packs", "{pack_id}", "users", "{user_id}", "memories")
# Supervisor working context (Fast Agent reads this)
SUPERVISOR_NAMESPACE = ("decision_ai", "{user_id}", "{session_id}", "supervisor_context")Define what memories look like for Decision AI:
from pydantic import BaseModel, Field
from typing import Literal
class DecisionAIMemory(BaseModel):
"""Base schema for all Decision AI memories."""
content: str
memory_type: Literal["preference", "fact", "decision", "pattern", "episode"]
confidence: float = Field(ge=0, le=1, default=0.8)
source: str # 'user_stated', 'inferred', 'pack_default'
class UserPreferenceMemory(DecisionAIMemory):
"""User preference memory."""
memory_type: Literal["preference"] = "preference"
category: str # 'response_style', 'data_format', 'analysis_depth'
strength: float = Field(ge=0, le=1) # How strongly they prefer
class DecisionOutcomeMemory(DecisionAIMemory):
"""Memory of a decision and its outcome."""
memory_type: Literal["decision"] = "decision"
context: str
options_presented: list[str]
option_chosen: str
outcome: str | None = None
satisfaction: float | None = None
class PatternMemory(DecisionAIMemory):
"""Learned pattern memory."""
memory_type: Literal["pattern"] = "pattern"
pattern_type: Literal["success", "failure", "preference"]
conditions: list[str] # When this pattern applies
recommendation: str # What to do/avoidLangMem distinguishes between:
| Path | Timing | Decision AI Use |
|---|---|---|
| Hot-Path | During conversation | Fast Agent memory tools |
| Cold-Path | Background, after activity | Supervisor memory extraction |
# HOT PATH: Fast Agent uses memory tools during conversation
@entrypoint(store=store)
async def fast_agent(message: str, curated_context: str):
# Search memories as part of response generation
memories = await search_memory_tool.ainvoke({
"query": message,
"limit": 3
})
response = await llm.ainvoke(
[{"role": "system", "content": curated_context}] +
[{"role": "user", "content": f"Memories: {memories}\n\n{message}"}]
)
return response
# COLD PATH: Supervisor extracts memories in background
async def supervisor_background_extraction(conversation_log: list):
# Process after conversation settles
await reflection_executor.submit(
{"messages": conversation_log},
after_seconds=300 # 5 minute delay
)How memories flow through the 3-Claude voice architecture:
+-------------------------------------------------------------------------------------+
| VOICE SESSION MEMORY FLOW |
+-------------------------------------------------------------------------------------+
| |
| USER SPEAKS |
| | |
| v |
| +-----------------+ |
| | Voice Context | Buffer utterances, batch for extraction |
| | Manager | Don't block voice response |
| +--------+--------+ |
| | |
| | writes to |
| v |
| +-----------------+ +-----------------+ |
| | conversation_log | <----- | FAST AGENT | Responds immediately |
| +--------+--------+ | (Haiku) | Reads curated_context |
| | +--------+--------+ NO memory writes |
| | | |
| | polls every 5s | reads |
| | | |
| v v |
| +-----------------+ +-----------------+ |
| | SUPERVISOR | ------> | curated_context | |
| | (Opus) | writes +-----------------+ |
| +--------+--------+ |
| | |
| | extracts memories via |
| | |
| v |
| +-------------------------------------------------------------------------+ |
| | LANGMEM STACK | |
| | | |
| | +--------------+ +--------------+ +--------------+ | |
| | | Memory | -> | Store | -> | Supabase | | |
| | | Manager | | Manager | | (pgvector) | | |
| | +--------------+ +--------------+ +--------------+ | |
| | | |
| | Extraction -------> Persistence -------> Semantic Search | |
| | | |
| +-------------------------------------------------------------------------+ |
| |
| ====================================================================================|
| |
| MEMORY PROMOTION FLOW: |
| |
| Session Memory --> [User confirms value] --> User Memory (persists) |
| |
| Example: |
| "User mentioned they prefer morning meetings" (session) |
| | |
| v |
| [User schedules morning meeting + positive feedback] |
| | |
| v |
| "User prefers morning meetings" (promoted to user-level) |
| |
+-------------------------------------------------------------------------------------+
async def prepare_fast_agent_context(
session: VoiceSession,
memory_bridge: MultiAgentMemoryBridge
) -> str:
"""Prepare context for Fast Agent - must be FAST."""
# Get pre-curated context from supervisor
curated = session.curated_context
# Optionally add high-priority memories (cached, not queried)
priority_memories = await memory_bridge.get_cached_priority_memories()
# Keep total context small for latency
return f"""## Instructions
You are a helpful voice assistant. Respond conversationally.
## Curated Context
{curated}
## Key User Preferences
{priority_memories}
## Guidelines
- Be concise (voice)
- Reference thread for details
- Don't repeat what supervisor already covered
"""How Decision Packs access and contribute to memory:
+-------------------------------------------------------------------------------------+
| DECISION PACK MEMORY ARCHITECTURE |
+-------------------------------------------------------------------------------------+
| |
| PACK: "MMM Analyst" |
| +-------------------------------------------------------------------------+ |
| | | |
| | PACK-LEVEL MEMORIES (shared across all users) | |
| | namespace: ("packs", "mmm-analyst", "shared_memories") | |
| | | |
| | +-----------------------------------------------------------+ | |
| | | * "MMM models work best with 2+ years of data" | | |
| | | * "Always check for data seasonality before modeling" | | |
| | | * "Users often confuse ROAS with contribution" | | |
| | | * "PyMC-Marketing requires specific data format" | | |
| | +-----------------------------------------------------------+ | |
| | | |
| +-------------------------------------------------------------------------+ |
| | |
| | loads at session start |
| v |
| +-------------------------------------------------------------------------+ |
| | | |
| | USER-PACK MEMORIES (user's experience with this pack) | |
| | namespace: ("packs", "mmm-analyst", "users", "{user_id}", "memories") | |
| | | |
| | +-----------------------------------------------------------+ | |
| | | * "User prefers Bayesian over frequentist explanations" | | |
| | | * "User's data has weekly seasonality" | | |
| | | * "User typically analyzes Q4 holiday periods" | | |
| | | * "User's company uses BigQuery for data" | | |
| | +-----------------------------------------------------------+ | |
| | | |
| +-------------------------------------------------------------------------+ |
| | |
| | session extracts & contributes |
| v |
| +-------------------------------------------------------------------------+ |
| | | |
| | PACK PATTERNS (learned from all users, anonymized) | |
| | namespace: ("packs", "mmm-analyst", "patterns") | |
| | | |
| | +-----------------------------------------------------------+ | |
| | | * "When users ask about attribution, they usually want | | |
| | | channel contribution breakdown" | | |
| | | * "Data formatting errors are the #1 session failure" | | |
| | | * "Users who start with small datasets often scale up" | | |
| | +-----------------------------------------------------------+ | |
| | | |
| +-------------------------------------------------------------------------+ |
| |
| ====================================================================================|
| |
| CONTRIBUTION FLOW: |
| |
| Session discovers useful pattern |
| | |
| v |
| Supervisor extracts as memory |
| | |
| v |
| [Is pattern generalizable?] |
| | |
| +-- YES --> Anonymize + contribute to pack patterns |
| | |
| +-- NO ---> Store in user-pack namespace only |
| |
+-------------------------------------------------------------------------------------+
class PackSession:
"""A session using a Decision Pack with memory support."""
def __init__(
self,
pack_id: str,
user_id: str,
memory_store: BaseStore
):
self.pack_id = pack_id
self.user_id = user_id
self.store = memory_store
self.pack_loader = PackMemoryLoader(memory_store)
# Memory managers for different scopes
self.pack_memory_manager = create_memory_store_manager(
"anthropic:claude-3-5-sonnet-latest",
namespace=("packs", pack_id, "users", user_id, "memories"),
store=memory_store,
)
async def initialize_session_context(self) -> str:
"""Load all relevant memories for session start."""
pack_context = await self.pack_loader.load_pack_context(
self.pack_id,
self.user_id,
query=None # Load general context
)
return self._format_pack_context(pack_context)
async def process_conversation(self, messages: list):
"""Extract memories from conversation."""
# Extract to user-pack namespace
await self.pack_memory_manager.ainvoke({"messages": messages})
# Check for contribution opportunities
await self._check_contribution_opportunities(messages)
async def _check_contribution_opportunities(self, messages: list):
"""Check if any learnings should be contributed to pack."""
# Use LLM to identify generalizable patterns
patterns = await self._extract_generalizable_patterns(messages)
for pattern in patterns:
if pattern.is_generalizable and pattern.confidence > 0.8:
await self.pack_loader.contribute_to_pack(
self.pack_id,
memory=pattern.as_dict(),
anonymize=True
)Goal: Replace current session-only memory with LangMem-backed persistent memory.
| Task | Details | Dependencies |
|---|---|---|
| Set up AsyncPostgresStore | Connect LangMem to Supabase | Supabase Pro (pgvector) |
| Implement basic user memories | User preferences across sessions | Store setup |
| Integrate with Supervisor | Supervisor extracts memories in background | User memories |
| Memory search in Fast Agent | Fast Agent can search user memories | User memories |
Success criteria:
- User says "remember I prefer dark mode" -> persists across sessions
- Supervisor automatically extracts preferences from conversation
- Fast Agent can retrieve relevant memories
# Phase 1 implementation sketch
from langmem import create_memory_store_manager, create_search_memory_tool
from langgraph.store.postgres import AsyncPostgresStore
# Setup
store = AsyncPostgresStore(
connection_string=os.getenv("SUPABASE_DB_URL"),
index={"dims": 1536, "embed": "openai:text-embedding-3-small"}
)
user_memory_manager = create_memory_store_manager(
"anthropic:claude-3-5-sonnet-latest",
namespace=("decision_ai", "{user_id}", "memories"),
store=store,
)
# In Supervisor loop
async def supervisor_poll_cycle(session: VoiceSession):
# ... existing supervisor logic ...
# NEW: Extract memories from gap messages
if gap_messages:
await user_memory_manager.ainvoke({
"messages": self._format_gap_messages(gap_messages)
}, config={"configurable": {"user_id": session.user_id}})Goal: Optimize memory for voice session latency requirements.
| Task | Details | Dependencies |
|---|---|---|
| Voice Context Manager | Streaming-aware memory extraction | Phase 1 |
| Multi-Agent Memory Bridge | Different views for different agents | Phase 1 |
| Memory caching layer | Pre-fetch high-priority memories | Voice Context Manager |
| Streaming summarization | Incremental summaries for voice | Phase 1 |
Success criteria:
- Fast Agent response time unchanged (<1s)
- Supervisor uses full memory context
- Session memories promote to user memories when valuable
# Phase 2 implementation sketch
class OptimizedVoiceMemory:
"""Phase 2: Optimized memory for voice sessions."""
def __init__(self, store: BaseStore, user_id: str, session_id: str):
self.bridge = MultiAgentMemoryBridge(store, user_id, session_id)
self.voice_manager = VoiceContextManager(
create_memory_store_manager(
"anthropic:claude-3-5-sonnet-latest",
namespace=("decision_ai", user_id, session_id, "session"),
store=store,
)
)
# Pre-cached memories for fast access
self._priority_cache: list = []
self._cache_refresh_task: asyncio.Task = None
async def start_session(self):
"""Initialize memory for voice session."""
# Pre-fetch priority memories
self._priority_cache = await self.bridge.get_fast_agent_context()
# Start background cache refresh
self._cache_refresh_task = asyncio.create_task(
self._refresh_cache_loop()
)
def get_fast_agent_context(self) -> str:
"""Get context for Fast Agent - synchronous, from cache."""
return self._priority_cache # No await - instantGoal: Full Decision Pack memory and outcome tracking.
| Task | Details | Dependencies |
|---|---|---|
| Pack Memory Loader | Load pack-level memories | Phase 1 |
| Decision Outcome Tracker | Track decisions and outcomes | Phase 1 |
| Pattern extraction | Extract patterns from outcomes | Outcome Tracker |
| Pack contribution flow | Anonymize and contribute learnings | Pack Loader |
Success criteria:
- Packs have shared memories that improve over time
- Decisions are tracked with outcomes
- Patterns emerge from successful/failed decisions
- Users contribute anonymized learnings back to packs
# Phase 3 implementation sketch
class DecisionPackWithMemory:
"""Phase 3: Full Decision Pack memory integration."""
def __init__(
self,
pack_manifest: PackManifest,
user_id: str,
store: BaseStore
):
self.manifest = pack_manifest
self.user_id = user_id
self.store = store
# Memory components
self.pack_loader = PackMemoryLoader(store)
self.outcome_tracker = DecisionOutcomeTracker(
create_memory_store_manager(
"anthropic:claude-3-5-sonnet-latest",
namespace=("packs", pack_manifest.id, "users", user_id, "decisions"),
store=store,
)
)
async def run_session(self, initial_query: str):
"""Run a pack session with full memory support."""
# Load pack context
pack_context = await self.pack_loader.load_pack_context(
self.manifest.id,
self.user_id,
query=initial_query
)
# ... session execution ...
# After session: process outcomes
for decision in self.session_decisions:
await self.outcome_tracker.record_decision(
decision_id=decision.id,
context=decision.context,
options_presented=decision.options,
option_chosen=decision.chosen
)
# Later: record outcomes
async def on_user_feedback(decision_id: str, feedback: dict):
await self.outcome_tracker.record_outcome(
decision_id=decision_id,
outcome=feedback["outcome"],
satisfaction=feedback["satisfaction"]
)- Memory Manager - Extract structured memories from conversations
- Store Manager - Persist to Supabase via AsyncPostgresStore
- Memory Tools - Agent-controlled memory management
- Summarization - Long context management
- Prompt Optimizer - Learn from feedback
- Namespace Scoping - Hierarchical memory organization
- Voice Context Manager - Streaming-aware extraction
- Multi-Agent Memory Bridge - Different views for 3-Claude architecture
- Decision Outcome Tracker - Link decisions to results
- Streaming Summarizer - Incremental voice summaries
- Pack Memory Loader - Decision Pack shared memory
| Phase | Timeline | Outcome |
|---|---|---|
| Phase 1 | Weeks 1-4 | Basic persistent user memory |
| Phase 2 | Weeks 5-8 | Optimized for voice latency |
| Phase 3 | Weeks 9-12 | Full pack memory + outcomes |
Bottom line: LangMem gives us a production-ready foundation. We build the Decision AI-specific layer on top. Total effort: ~12 weeks to full memory system vs ~6+ months building from scratch.
LangMem transforms Decision AI from "stateless sessions" to "learning agents that remember". The 70/30 split lets us leverage battle-tested components while building the custom intelligence layer that makes Decision AI unique.