Memory is fundamental to human intelligence—it shapes identity, guides decisions, and enables learning, adaptation, and meaningful relationships. In communication, memory allows us to recall past interactions, infer preferences, and maintain coherent, context-rich exchanges over long periods. In contrast, current AI agents powered by large language models (LLMs) are limited by fixed context windows and lack persistent memory, leading to forgetfulness, contradictions, and a diminished user experience. Even as LLMs’ context windows grow, they cannot match the human ability to retain and retrieve relevant information across sessions and topics. This limitation is especially problematic in domains requiring continuity and trust, such as healthcare and education. To overcome these challenges, AI agents need robust memory systems that can selectively store, consolidate, and retrieve important information—mirroring human cognition. Such systems will enable AI agents to maintain consistent personas, track evolving user preferences, and build upon prior exchanges, transforming them into reliable, long-term collaborators.
To address the critical limitations of current AI agents—lack of persistent memory, poor long-term relationship building—we propose a distributed memory system for flink-agents that mirrors human cognitive processes.
The solution implements three memory types: Sensory Memory (internal events), Short-term Memory (recent history), and Long-term Memory (semantic retrieval). It leverages Flink's distributed state management with a hybrid backend combining history store and vector store, directly addressing LLM context window limitations through persistent, searchable memory.
Additionally, Knowledge provides shared long-term memory accessible across all agent instances, stored externally without Flink checkpointing, enabling domain-specific expertise and consistent collaboration in trust-critical domains.
The Flink-Agents Memory System introduces a hybrid state backend that combines two specialized state backends:
- History Store: Tracks all memory operations for audit trails, and handles retrieval for short-term memories. The default implementation of the history store uses Flink’s build-in state backends.
- Vector Store: Provides vector search capabilities for long-term semantic memory retrievals. The default implementation is based on embedded Lucene instances.
This hybrid approach optimizes both storage efficiency and search performance, unlike traditional single-backend solutions.
graph LR
subgraph "Flink Cluster"
subgraph "Task Manager 1"
A1[Agent Instance 1]
M1[Memory 1]
subgraph "Hybrid State Backend 1"
S1_R[History Store]
S1_L[Vector Store]
end
end
subgraph "Task Manager 2"
A2[Agent Instance 2]
M2[Memory 2]
subgraph "Hybrid State Backend 2"
S2_R[History Store]
S2_L[Vector Store]
end
end
subgraph "Task Manager N"
AN[Agent Instance N]
MN[Memory N]
subgraph "Hybrid State Backend N"
SN_R[History Store]
SN_L[Vector Store]
end
end
end
A1 --> M1
A2 --> M2
AN --> MN
M1 --> S1_R
M1 --> S1_L
M2 --> S2_R
M2 --> S2_L
MN --> SN_R
MN --> SN_L
style S1_R fill:#ffebee
style S1_L fill:#e8f5e8
style S2_R fill:#ffebee
style S2_L fill:#e8f5e8
style SN_R fill:#ffebee
style SN_L fill:#e8f5e8
The system supports different types of memories, each with specialized storage in our hybrid state backend. Sensory memory operates internally and is not exposed to users:
graph TD
subgraph "Memory Types"
SM["Sensory Memory<br/>Agent Events<br/>MapState<br/>Internal Only"]
STM["Short-term Memory<br/>Recent History<br/>History Storage<br/>get_history(n)"]
LTM["Long-term Memory<br/>Semantic Search<br/>Vector Storage<br/>search(query)"]
end
subgraph "Hybrid State Backend"
HB_R["History Store"]
HB_L["Vector Store"]
end
STM -.-> HB_R
LTM -.-> HB_L
style SM fill:#e1f5fe
style STM fill:#f3e5f5
style LTM fill:#e8f5e8
style HB_R fill:#ffebee
style HB_L fill:#e8f5e8
The system supports three distinct types of memories, each serving different purposes and using different storage mechanisms:
Sensory memory captures real-time events from agents and stores them in Flink's keyed state using MapState. This represents the immediate sensory input that agents receive from their environment. Sensory memory is completely invisible to users and operates automatically in the background.
Characteristics:
- Storage: Flink MapState (in-memory + checkpointed)
- Retention: Configurable window size
- Access Pattern: Lookup
- Performance: Nano-seconds latency
- Visibility: Internal only - no user API access
Short-term memory maintains a configurable history of recent agent interactions and experiences. It uses history storage for persistence and provides fast access to recent memories.
Characteristics:
- Storage: History storage (local to each TaskManager)
- Retention: Persistent, configurable TTL
- Access Pattern: Sequential, indexed by time
- Performance: Sub-millisecond latency
Long-term memory provides semantic search capabilities using a vector storage backend. It stores memories with embeddings for similarity-based retrieval.
Characteristics:
- Storage: Vector storage backend
- Retention: Persistent, configurable TTL
- Access Pattern: Semantic search, similarity-based
- Performance: Optimized for search operations
The Memory API provides a unified interface for different types of memories. Sensory memory is not exposed through the API and operates automatically in the background.
@dataclass
class MemoryItem(BaseModel):
id: str = Field(..., description="The unique identifier for the text data")
memory: str = Field(
..., description="The memory deduced from the text data"
)
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata for the text data")
created_at: Optional[str] = Field(None, description="The timestamp when the memory was created")
updated_at: Optional[str] = Field(None, description="The timestamp when the memory was updated")
class MemoryAPI(ABC):
"""
Core memory interface for Flink agents.
This API provides access to different types of memories:
- Short-term memory: Recent history via get_history(n) method
- Long-term memory: Semantic search via search() method
Note: Sensory memory is internal and not exposed through this API.
"""
@abstractmethod
def add(self, memory: MemoryItem) -> str:
"""Store a memory item"""
pass
@abstractmethod
def update(self, memory_id: str, data: MemoryItem) -> MemoryItem:
"""Update a memory by ID"""
pass
@abstractmethod
def delete(self, memory_id: str) -> None:
"""Delete a memory by ID"""
pass
@abstractmethod
def get(self, memory_id: str) -> Optional[MemoryItem]:
"""Retrieve a memory by ID"""
pass
@abstractmethod
def get_history(self, n: int = 10) -> List[MemoryItem]:
"""Get last n memories"""
pass
@abstractmethod
def search(self, query: str, limit: int = 10) -> List[MemoryItem]:
"""Search memories by semantic similarity"""
passsequenceDiagram
participant Agent
participant Memory
participant LLM
participant Embeddings
participant VectorStore
participant HistoryStore
Agent->>Memory: add(messages, agent_id)
Memory->>LLM: extract_facts(messages)
LLM-->>Memory: extracted_facts[]
loop For each fact
Memory->>Embeddings: embed(fact)
Embeddings-->>Memory: vector
Memory->>VectorStore: search_similar(vector)
VectorStore-->>Memory: existing_memories[]
end
Memory->>LLM: update_memory_decisions(facts, existing)
LLM-->>Memory: actions[ADD/UPDATE/DELETE]
loop For each action
alt ADD
Memory->>VectorStore: insert(vector, metadata)
Memory->>HistoryStore: add_history(memory_id, null, new_memory, "ADD")
else UPDATE
Memory->>VectorStore: update(memory_id, vector, metadata)
Memory->>HistoryStore: add_history(memory_id, old_memory, new_memory, "UPDATE")
else DELETE
Memory->>VectorStore: delete(memory_id)
Memory->>HistoryStore: add_history(memory_id, old_memory, null, "DELETE")
end
end
Memory-->>User: memory_results[]
sequenceDiagram
participant Agent
participant Memory
participant Embeddings
participant VectorStore
Agent->>Memory: search(query)
Memory->>Embeddings: embed(query)
Embeddings-->>Memory: query_vector
Memory->>VectorStore: search(query_vector, filters)
VectorStore-->>Memory: similar_memories[]
Memory-->>Agent: search_results[]
Knowledge is a special type of long-term memory that is shared across all Agent instances. It provides access to external, pre-built knowledge sources through remote vector database connections, offering domain-specific knowledge that can enhance agent responses. Unlike individual agent memories, knowledge is globally accessible and persistent across the entire Flink cluster. Since knowledge is independent from a particular agent, it does not require Flink state checkpointing.
graph TB
subgraph "Flink Cluster"
subgraph "Agent Instances"
A1[Agent 1]
A2[Agent 2]
AN[Agent N]
end
end
subgraph "External Knowledge Store"
KB1[Knowledge Base 1<br/>Domain A]
KB2[Knowledge Base 2<br/>Domain B]
KB3[Knowledge Base N<br/>Domain N]
end
A1 -.-> KB1
A1 -.-> KB2
A1 -.-> KB3
A2 -.-> KB1
A2 -.-> KB2
A2 -.-> KB3
AN -.-> KB1
AN -.-> KB2
AN -.-> KB3
style KB1 fill:#fff3e0
style KB2 fill:#fff3e0
style KB3 fill:#fff3e0
class KnowledgeBase:
"""
Shared knowledge base integration.
Provides access to pre-built knowledge sources through remote
vector database connections. This is a special type of long-term
memory that is shared across all agent instances in the cluster.
"""
def __init__(self, service_url: str, api_key: str = None):
self.service_url = service_url
self.api_key = api_key
self.client = VectorDBClient(service_url, api_key)
def search(self, query: str, knowledge_source: str = None, limit: int = 10) -> List[KnowledgeItem]:
"""
Search external knowledge base.
Args:
query: Search query string
knowledge_source: Specific knowledge source to search (optional)
limit: Maximum number of results to return
Returns:
List of knowledge items with relevance scores
"""
response = self.client.search(
query=query,
source=knowledge_source,
limit=limit
)
return [KnowledgeItem.from_response(item) for item in response.results]
def get_sources(self) -> List[str]:
"""Get available knowledge sources"""
return self.client.list_sources()
def get_source_info(self, source_name: str) -> Dict[str, Any]:
"""Get information about a specific knowledge source"""
return self.client.get_source_info(source_name)
class KnowledgeItem:
"""Represents a knowledge item from external sources"""
def __init__(self, content: str, source: str, score: float, metadata: Dict[str, Any] = None):
self.content = content
self.source = source
self.score = score
self.metadata = metadata or {}class AgentWithMemory(Agent):
"""Flink agent with shared memory and knowledge integration"""
def __init__(self, agent_id: str, knowledge_bases: List[KnowledgeBase] = None):
super().__init__(agent_id)
self.knowledge_bases = knowledge_bases or []
def search_knowledge(self, query: str, source: str = None, limit: int = 10) -> List[KnowledgeItem]:
"""
Search across all available shared knowledge bases.
Args:
query: Search query
source: Specific knowledge source (optional)
limit: Maximum results per source
Returns:
Combined results from all shared knowledge bases
"""
all_results = []
for kb in self.knowledge_bases:
if source is None or source == kb.get_source_info().get('name'):
results = kb.search(query, limit=limit)
all_results.extend(results)
# Sort by relevance score
all_results.sort(key=lambda x: x.score, reverse=True)
return all_results[:limit]
def process_event(self, event: AgentEvent):
"""Process event with memory and knowledge integration"""
# Get memory context
recent_history = self.memory.get_history(5)
relevant_memories = self.memory.search(event.content, limit=10)
# Get knowledge context
knowledge_context = self.search_knowledge(event.content, limit=5)
# Generate response with all context
response = self.generate_response(
event,
recent_history,
relevant_memories,
knowledge_context
)
# Store interaction
interaction_memory = MemoryItem(
id=f"interaction_{time.time()}",
memory=f"Event: {event.content}, Response: {response}",
metadata={
"timestamp": time.time(),
"agent_id": self.agent_id,
},
created_at=str(time.time())
)
self.memory.store(interaction_memory)
return response- Memory Compression: Automatic compression of old memories
- Memory Consolidation: Automatic consolidation of similar memories
- Memory Forgetting: Configurable forgetting mechanisms
- Cross-Agent Memory Sharing: Shared memory between related agents
- Memory Visualization: Web UI for memory exploration
- Adaptive Memory Management: Dynamic adjustment of memory allocation
- Memory Quality Assessment: Automatic assessment of memory relevance
- Memory Synthesis: Automatic generation of higher-level memories
- Memory Transfer: Transfer of memories between different agent instances
The Flink-Agents Memory System introduces a revolutionary hybrid state backend that combines RocksDB and Lucene instances within a unified state management framework. This dual-engine approach eliminates the need for external vector databases while providing optimal performance for both historical data storage and semantic search operations.
- Hybrid State Backend: The core innovation combining RocksDB instances for historical data and Lucene instances for vector search
- Unified Interface: Single API that automatically routes operations to the appropriate storage engine
- Distributed Architecture: Each TaskManager hosts its own hybrid backend with isolated RocksDB and Lucene instances
- Coordinated Checkpointing: Ensures consistent state recovery across both storage engines
The system is designed to be:
- Scalable: Grows with Flink cluster size through distributed hybrid state backends
- Fault-tolerant: Leverages coordinated checkpointing of both RocksDB and Lucene instances
- Performant: Optimized storage and search through specialized backend instances
- Extensible: Easy to add new memory types and storage backends
- Observable: Comprehensive monitoring and metrics for both backend instances
This design provides a solid foundation for building sophisticated, memory-aware agents in Apache Flink environments, with the hybrid state backend serving as the cornerstone of the architecture.