Apache Flink Agents is an Agentic AI framework built on top of Apache Flink that provides a distributed, real-time, event-driven architecture for building intelligent agent systems. This document provides a comprehensive overview of the architecture, components, and design patterns used in the framework.
- Overview
- Core Architecture
- Module Structure
- Event-Driven Architecture
- Multi-Language Support
- Runtime Execution
- Runtime Architecture
- Component Interactions
- Examples and Usage Patterns
- Design Patterns
Apache Flink Agents is designed to enable the development of intelligent agent systems that can process real-time data streams, make decisions, and take actions based on events. The framework leverages Apache Flink's distributed stream processing capabilities to provide:
- Event-driven architecture for reactive agent behavior
- Multi-language support (Java and Python) for flexibility
- Distributed execution for scalability
- Real-time processing for low-latency responses
- Fault tolerance inherited from Apache Flink
The architecture follows a layered approach with clear separation of concerns:
graph TB
subgraph "User Layer"
UA[User Agents]
UE[User Events]
UF[User Functions]
end
subgraph "API Layer"
API[flink-agents-api]
AE[Agent Interface]
EV[Event System]
RC[RunnerContext]
DC[Decorators]
end
subgraph "Plan Layer"
PL[flink-agents-plan]
AP[AgentPlan]
AC[Action]
FN[Function]
SER[Serialization]
end
subgraph "Runtime Layer"
RT[flink-agents-runtime]
OPR[Operators]
ENV[Environment]
CTX[Context]
PYE[Python Executor]
end
subgraph "Flink Core"
FC[Apache Flink]
FS[Flink Streaming]
FR[Flink Runtime]
end
UA --> API
UE --> API
UF --> API
API --> PL
PL --> RT
RT --> FC
style UA fill:#e1f5fe
style API fill:#f3e5f5
style PL fill:#e8f5e8
style RT fill:#fff3e0
style FC fill:#ffebee
The API module provides the core abstractions and interfaces for building agents:
- Agent: Base class for defining agent logic
- Event: Base class for all events in the system
- RunnerContext: Interface for agent execution context
- Decorators: Python decorators for action definition
The plan module handles the compilation and serialization of agent definitions:
- AgentPlan: Compiled representation of an agent
- Action: Represents an agent action with event listeners
- Function: Abstraction for executable functions (Java/Python)
- Serialization: JSON serialization/deserialization support
The runtime module provides the execution engine for agents:
- Operators: Flink operators for agent execution
- Environment: Python environment management
- Context: Runtime context implementations
- Executors: Python action execution support
Contains example implementations and usage patterns for reference.
The framework is built around an event-driven architecture where agents react to events by executing actions:
graph LR
subgraph "Event Flow"
IE[InputEvent] --> A1[Action 1]
IE --> A2[Action 2]
A1 --> ME[MyEvent]
A2 --> OE[OutputEvent]
ME --> A3[Action 3]
A3 --> OE2[OutputEvent]
end
subgraph "Agent Definition"
AG[Agent Class]
AG --> AC1["@action(InputEvent)"]
AG --> AC2["@action(MyEvent)"]
AC1 --> F1[first_action]
AC2 --> F2[second_action]
end
style IE fill:#e3f2fd
style ME fill:#f3e5f5
style OE fill:#e8f5e8
style OE2 fill:#e8f5e8
- InputEvent: Represents incoming data from upstream sources
- OutputEvent: Represents data to be sent downstream
- Custom Events: User-defined events for internal agent communication
Actions are defined using decorators in Python:
@action(InputEvent)
@staticmethod
def process_input(event: Event, ctx: RunnerContext):
# Process the input event
ctx.send_event(OutputEvent(output=result))The framework supports both Java and Python through a sophisticated integration mechanism:
graph TB
subgraph "Java Side"
JA[Java Agent]
JF[JavaFunction]
JE[Java Events]
JC[Java Context]
end
subgraph "Python Side"
PA[Python Agent]
PF[PythonFunction]
PE[Python Events]
PC[Python Context]
end
subgraph "Integration Layer"
PEM[Pemja Library]
SER[Serialization]
ENV[Python Environment]
EXE[Python Executor]
end
JA --> PEM
PA --> PEM
PEM --> SER
SER --> ENV
ENV --> EXE
JF -.-> PF
JE -.-> PE
JC -.-> PC
style PEM fill:#fff3e0
style SER fill:#f3e5f5
- Pemja Library: Enables Java-Python interoperability
- Embedded Python Environment: Manages Python interpreter lifecycle
- Serialization: Uses CloudPickle for object serialization
- Python Action Executor: Executes Python functions from Java runtime
The runtime execution follows a sophisticated pipeline:
graph TB
subgraph "Data Flow"
DS[Data Stream] --> KS[Keyed Stream]
KS --> AEO[ActionExecutionOperator]
AEO --> OS[Output Stream]
end
subgraph "ActionExecutionOperator"
IE[Input Event] --> EQ[Event Queue]
EQ --> AL[Action Lookup]
AL --> AE[Action Execution]
AE --> EC[Event Collection]
EC --> EQ
EC --> OE[Output Events]
end
subgraph "Action Execution"
JX[Java Execution]
PX[Python Execution]
AE --> JX
AE --> PX
JX --> RC[RunnerContext]
PX --> PE[Python Executor]
PE --> RC
end
style AEO fill:#e3f2fd
style JX fill:#e8f5e8
style PX fill:#fff3e0
- Input Processing: Incoming data is wrapped in InputEvent
- Event Queue: Events are queued for processing
- Action Lookup: Find actions that listen to the event type
- Action Execution: Execute actions (Java or Python)
- Event Collection: Collect generated events
- Output Processing: Send OutputEvents downstream
The runtime module (flink-agents-runtime) is the core execution engine that bridges the high-level agent definitions with Apache Flink's distributed streaming infrastructure. It provides sophisticated components for multi-language execution, state management, and feedback mechanisms.
The runtime module is organized into several key packages, each serving specific architectural responsibilities:
graph TB
subgraph "Runtime Module Structure"
OP[operator/]
PY[python/]
ENV[env/]
CTX[context/]
FB[feedback/]
LOG[logger/]
MSG[message/]
QUE[queue/]
UTL[utils/]
COM[common/]
CU[CompileUtils.java]
end
subgraph "Core Execution"
OP --> AEO[ActionExecutionOperator]
OP --> AEOP[ActionExecutionOperatorFactory]
end
subgraph "Python Integration"
PY --> PAE[PythonActionExecutor]
PY --> PRC[PythonRunnerContext]
PY --> PEV[PythonEvent]
ENV --> EPE[EmbeddedPythonEnvironment]
ENV --> PEM[PythonEnvironmentManager]
end
subgraph "State & Feedback"
FB --> FC[FeedbackChannel]
FB --> FCB[FeedbackChannelBroker]
LOG --> FBL[FeedbackLogger]
LOG --> KGS[KeyGroupStream]
end
subgraph "Communication"
MSG --> EM[EventMessage]
MSG --> CM[CheckpointMessage]
QUE --> MPQ[MpscQueue]
end
style OP fill:#e3f2fd
style PY fill:#fff3e0
style FB fill:#e8f5e8
style MSG fill:#f3e5f5
The ActionExecutionOperator is the heart of the runtime execution engine:
graph TB
subgraph "ActionExecutionOperator Lifecycle"
INIT[Initialize] --> OPEN[Open]
OPEN --> PROC[Process Elements]
PROC --> CLOSE[Close]
end
subgraph "Processing Pipeline"
INPUT[Input Data] --> WRAP[Wrap to InputEvent]
WRAP --> QUEUE[Event Queue]
QUEUE --> LOOKUP[Action Lookup]
LOOKUP --> EXEC[Execute Actions]
EXEC --> COLLECT[Collect Events]
COLLECT --> OUTPUT[Output Events]
COLLECT --> QUEUE
end
subgraph "Multi-Language Execution"
EXEC --> JAVA[Java Execution]
EXEC --> PYTHON[Python Execution]
JAVA --> JCTX[Java RunnerContext]
PYTHON --> PEXEC[PythonActionExecutor]
PEXEC --> PCTX[Python RunnerContext]
end
style EXEC fill:#e3f2fd
style JAVA fill:#e8f5e8
style PYTHON fill:#fff3e0
Key Responsibilities:
- Event Processing: Converts input data to InputEvents and manages event flow
- Action Dispatching: Looks up and executes actions based on event types
- Multi-Language Support: Handles both Java and Python action execution
- State Management: Manages execution context and event collection
- Output Generation: Processes OutputEvents and sends data downstream
The Python integration provides seamless execution of Python code within the Java runtime:
graph TB
subgraph "Python Integration Architecture"
PEM[PythonEnvironmentManager] --> EPE[EmbeddedPythonEnvironment]
EPE --> PI[PythonInterpreter]
PAE[PythonActionExecutor] --> PEM
PAE --> PRC[PythonRunnerContextImpl]
PAE --> PI
end
subgraph "Python Execution Flow"
PF[PythonFunction] --> PAE
PAE --> LOAD[Load Python Modules]
LOAD --> INIT[Initialize Context]
INIT --> EXEC[Execute Function]
EXEC --> COLLECT[Collect Events]
end
subgraph "Serialization Bridge"
JE[Java Event] --> SER[Serialize]
SER --> PE[Python Event]
PE --> DESER[Deserialize]
DESER --> PO[Python Object]
end
style PAE fill:#fff3e0
style PEM fill:#f3e5f5
style SER fill:#e8f5e8
Key Components:
- PythonEnvironmentManager: Manages Python interpreter lifecycle and configuration
- EmbeddedPythonEnvironment: Provides embedded Python execution environment
- PythonActionExecutor: Executes Python functions and manages event conversion
- PythonRunnerContextImpl: Specialized context for Python action execution
- Serialization Layer: Uses CloudPickle for Java-Python object conversion
The runtime includes sophisticated feedback mechanisms inherited from Flink StateFun:
graph TB
subgraph "Feedback Architecture"
FBK[FeedbackKey] --> SFBK[SubtaskFeedbackKey]
SFBK --> FBC[FeedbackChannel]
FBC --> FBQ[FeedbackQueue]
FCB[FeedbackChannelBroker] --> FBC
end
subgraph "Logging & Checkpointing"
FBL[FeedbackLogger] --> KGS[KeyGroupStream]
KGS --> CSO[CheckpointedStreamOperations]
CHKP[Checkpoints] --> FBL
end
subgraph "Message System"
MSG[Message] --> EM[EventMessage]
MSG --> CM[CheckpointMessage]
MPQ[MpscQueue] --> MSG
end
style FBC fill:#e8f5e8
style FBL fill:#f3e5f5
style MSG fill:#e3f2fd
Key Features:
- Feedback Channels: Enable communication between distributed components
- Checkpointed Logging: Provides fault-tolerant state management
- Message System: Handles both data and control messages
- Lock-Free Queues: Optimized for high-throughput message passing
The runtime provides different context implementations for various execution scenarios:
graph TB
subgraph "Context Hierarchy"
RC[RunnerContext] --> RCI[RunnerContextImpl]
RCI --> PRCI[PythonRunnerContextImpl]
end
subgraph "Context Operations"
SE[Send Event] --> PE[Pending Events]
PE --> DE[Drain Events]
DE --> CP[Check Pending]
end
subgraph "Event Flow"
ACTION[Action Execution] --> CTX[Context]
CTX --> EVENTS[Event Collection]
EVENTS --> NEXT[Next Actions]
end
style RC fill:#e3f2fd
style CTX fill:#f3e5f5
The complete runtime execution follows this sophisticated pipeline:
sequenceDiagram
participant DS as DataStream
participant AEO as ActionExecutionOperator
participant PAE as PythonActionExecutor
participant PEM as PythonEnvironmentManager
participant CTX as RunnerContext
participant FB as FeedbackChannel
DS->>AEO: Stream Element
AEO->>AEO: Wrap to InputEvent
AEO->>AEO: Lookup Actions
alt Java Action
AEO->>CTX: Execute Java Function
CTX->>AEO: Return Events
else Python Action
AEO->>PAE: Execute Python Function
PAE->>PEM: Get Python Environment
PEM->>PAE: Python Interpreter
PAE->>CTX: Execute with Context
CTX->>PAE: Return Events
PAE->>AEO: Return Events
end
AEO->>AEO: Process Generated Events
AEO->>DS: Emit Output Events
opt Feedback Required
AEO->>FB: Send Feedback
FB->>AEO: Feedback Response
end
The runtime includes several performance optimizations:
- MpscQueue: Multi-producer, single-consumer queue for high-throughput message passing
- Atomic Operations: Minimize contention in concurrent scenarios
- Object Pooling: Reuse of StreamRecord objects and other frequently allocated objects
- Efficient Serialization: Optimized serialization paths for Python-Java communication
- Lazy Initialization: Python environments are only created when needed
- Batch Processing: Events are processed in batches to reduce overhead
- Chaining Strategy: Always chains operators for optimal performance
The runtime inherits and extends Flink's fault tolerance mechanisms:
- State Persistence: All runtime state is checkpointed
- Feedback Logging: Feedback data is durably logged and checkpointed
- Recovery: Automatic recovery from failures with consistent state
- Exception Propagation: Proper error handling and propagation
- Resource Cleanup: Automatic cleanup of Python environments and resources
- Graceful Degradation: Continues operation when possible during partial failures
The runtime module integrates with other components through well-defined interfaces:
- Implements RunnerContext interface for action execution
- Supports all Event types defined in the API module
- Executes AgentPlan compiled from agent definitions
- Supports both JavaFunction and PythonFunction execution
- Built on Flink's streaming operators and runtime
- Leverages Flink's distributed execution and fault tolerance
- Integrates with Flink's checkpointing and state management
The runtime's execution of an AgentPlan follows a sophisticated multi-stage process that transforms high-level agent definitions into distributed streaming computations:
graph TB
subgraph "Plan Compilation"
UA[User Agent] --> AP["AgentPlan.from_agent()"]
AP --> JSON[JSON Serialization]
JSON --> DIST[Distribute to Runtime]
end
subgraph "Runtime Deployment"
DIST --> AEO[ActionExecutionOperator]
AEO --> INIT[Initialize Components]
INIT --> READY[Ready for Execution]
end
subgraph "Plan Structure"
ACTIONS["actions: Map<String, Action>"]
EVENTS["actions_by_event: Map<String, List<String>>"]
ACTIONS --> LOOKUP[Action Lookup Table]
EVENTS --> TRIGGER[Event Trigger Mapping]
end
style AP fill:#e3f2fd
style AEO fill:#fff3e0
style LOOKUP fill:#e8f5e8
When the ActionExecutionOperator starts, it follows this initialization sequence:
sequenceDiagram
participant Flink as Flink Runtime
participant AEO as ActionExecutionOperator
participant PEM as PythonEnvironmentManager
participant PAE as PythonActionExecutor
participant CTX as RunnerContext
Flink->>AEO: open()
AEO->>AEO: Initialize StreamRecord pool
AEO->>CTX: Create RunnerContextImpl
AEO->>AEO: Check for Python actions
alt Contains Python Actions
AEO->>PEM: Create PythonEnvironmentManager
PEM->>PEM: Setup Python dependencies
AEO->>PAE: Create PythonActionExecutor
PAE->>PEM: Get EmbeddedPythonEnvironment
PEM->>PAE: Return Python interpreter
PAE->>PAE: Load Python modules
PAE->>PAE: Initialize Python context
end
AEO->>Flink: Ready for processing
The core execution loop processes each incoming data element through this pipeline:
graph TB
subgraph "Input Processing"
IN[Input Data] --> CHECK{Input Source?}
CHECK -->|Java| JIE[Java InputEvent]
CHECK -->|Python| PIE[Python InputEvent]
end
subgraph "Event Queue Management"
JIE --> EQ[Event Queue]
PIE --> EQ
EQ --> POP[Pop Event]
POP --> LOOKUP[Lookup Actions]
end
subgraph "Action Execution"
LOOKUP --> ACTIONS[List<Action>]
ACTIONS --> ITER[Iterate Actions]
ITER --> TYPE{Action Type?}
TYPE -->|Java| JEXEC[Execute Java Function]
TYPE -->|Python| PEXEC[Execute Python Function]
end
subgraph "Event Collection"
JEXEC --> JEVENTS[Collect Java Events]
PEXEC --> PEVENTS[Collect Python Events]
JEVENTS --> MERGE[Merge Events]
PEVENTS --> MERGE
MERGE --> FILTER{Event Type?}
FILTER -->|OutputEvent| OUTPUT[Send Downstream]
FILTER -->|Other| PUSH[Push to Queue]
PUSH --> EQ
end
style LOOKUP fill:#e3f2fd
style JEXEC fill:#e8f5e8
style PEXEC fill:#fff3e0
style OUTPUT fill:#f3e5f5
The runtime uses the AgentPlan's action mapping for efficient event routing:
graph LR
subgraph "AgentPlan Structure"
AP[AgentPlan]
AP --> AM[actions: Map<String, Action>]
AP --> AE[actions_by_event: Map<String, List<String>>]
end
subgraph "Lookup Process"
EVENT[Event] --> TYPE[Get Event Type]
TYPE --> FQCN[Fully Qualified Class Name]
FQCN --> AE
AE --> NAMES[Action Names List]
NAMES --> AM
AM --> ACTIONS[List<Action>]
end
subgraph "Execution"
ACTIONS --> DISPATCH[Dispatch to Executors]
DISPATCH --> JAVA[Java Actions]
DISPATCH --> PYTHON[Python Actions]
end
style FQCN fill:#e3f2fd
style DISPATCH fill:#fff3e0
For Java actions, the execution is straightforward:
// Simplified Java action execution
if (action.getExec() instanceof JavaFunction) {
action.getExec().call(event, runnerContext);
actionOutputEvents = runnerContext.drainEvents();
}sequenceDiagram
participant AEO as ActionExecutionOperator
participant JF as JavaFunction
participant CTX as RunnerContext
AEO->>JF: call(event, context)
JF->>JF: Execute business logic
JF->>CTX: sendEvent(outputEvent)
CTX->>CTX: Add to pending events
JF->>AEO: Return
AEO->>CTX: drainEvents()
CTX->>AEO: List<Event>
Python actions require more complex handling through the PythonActionExecutor:
sequenceDiagram
participant AEO as ActionExecutionOperator
participant PAE as PythonActionExecutor
participant PI as PythonInterpreter
participant PF as PythonFunction
participant PCTX as PythonRunnerContext
AEO->>PAE: executePythonFunction(function, event)
PAE->>PAE: Check no pending events
PAE->>PF: setInterpreter(interpreter)
PAE->>PI: invoke(CONVERT_TO_PYTHON_OBJECT, event)
PI->>PAE: Python event object
PAE->>PI: get(FLINK_RUNNER_CONTEXT_VAR_NAME)
PI->>PAE: Python context object
PAE->>PF: call(pythonEvent, pythonContext)
PF->>PF: Execute Python business logic
PF->>PCTX: send_event(output_event)
PCTX->>PCTX: Serialize and store event
PF->>PAE: Return
PAE->>PCTX: drainEvents()
PCTX->>PAE: List<Event>
PAE->>AEO: Return events
The runtime manages complex event flows through a sophisticated queuing mechanism:
graph TB
subgraph "Event Flow Management"
START[Start Processing] --> QUEUE[Initialize Event Queue]
QUEUE --> PUSH[Push InputEvent]
PUSH --> LOOP{Queue Empty?}
LOOP -->|No| POP[Pop Event]
POP --> PROCESS[Process Event]
PROCESS --> COLLECT[Collect Output Events]
COLLECT --> CHECK{Has Events?}
CHECK -->|Yes| FILTER[Filter Events]
FILTER --> OUTPUT{OutputEvent?}
OUTPUT -->|Yes| EMIT[Emit Downstream]
OUTPUT -->|No| PUSH2[Push to Queue]
PUSH2 --> LOOP
EMIT --> LOOP
CHECK -->|No| LOOP
LOOP -->|Yes| END[End Processing]
end
style PROCESS fill:#e3f2fd
style EMIT fill:#e8f5e8
style LOOP fill:#fff3e0
The runtime includes comprehensive error handling:
graph TB
subgraph "Error Handling Strategy"
EXEC[Action Execution] --> ERROR{Exception?}
ERROR -->|No| SUCCESS[Collect Events]
ERROR -->|Yes| TYPE{Error Type?}
TYPE -->|Python| CLEANUP[Cleanup Python Context]
TYPE -->|Java| PROPAGATE[Propagate Exception]
CLEANUP --> LOG[Log Error]
PROPAGATE --> LOG
LOG --> RECOVER{Recoverable?}
RECOVER -->|Yes| RETRY[Retry Execution]
RECOVER -->|No| FAIL[Fail Task]
RETRY --> EXEC
end
style ERROR fill:#ffebee
style CLEANUP fill:#fff3e0
style FAIL fill:#ffcdd2
The runtime employs several optimizations during AgentPlan execution:
- Lazy Python Environment Creation: Python environments are only initialized when the first Python action is encountered
- Event Batching: Multiple events are processed in batches to reduce overhead
- Object Reuse: StreamRecord objects are reused to minimize garbage collection
- Efficient Lookup: Action lookup uses hash maps for O(1) event-to-action mapping
- Context Pooling: RunnerContext objects are reused across invocations
The AgentPlan execution integrates with Flink's checkpointing mechanism:
graph LR
subgraph "Checkpointing Flow"
CP[Checkpoint Trigger] --> STATE[Capture State]
STATE --> PYTHON[Python Environment State]
STATE --> CONTEXT[Context State]
STATE --> QUEUE[Queue State]
PYTHON --> PERSIST[Persist to Storage]
CONTEXT --> PERSIST
QUEUE --> PERSIST
end
subgraph "Recovery Flow"
RESTORE[Restore Trigger] --> LOAD[Load State]
LOAD --> INIT[Reinitialize Components]
INIT --> RESUME[Resume Processing]
end
style CP fill:#e3f2fd
style RESTORE fill:#f3e5f5
This comprehensive execution process ensures that AgentPlan definitions are efficiently and reliably executed in the distributed Flink environment, with full support for multi-language actions, complex event flows, and fault tolerance.
This runtime architecture provides a robust, scalable, and fault-tolerant foundation for executing intelligent agents in distributed streaming environments, while maintaining the flexibility to support multiple programming languages and complex event-driven workflows.
sequenceDiagram
participant User
participant Agent
participant AgentPlan
participant Action
participant Function
User->>Agent: Define agent class
Agent->>AgentPlan: from_agent()
AgentPlan->>Action: Extract actions
Action->>Function: Wrap functions
Function->>AgentPlan: Return compiled plan
AgentPlan->>User: Ready for execution
sequenceDiagram
participant Stream
participant Operator
participant Context
participant Executor
participant Agent
Stream->>Operator: Process element
Operator->>Context: Create context
Operator->>Executor: Execute action
Executor->>Agent: Call function
Agent->>Context: Send events
Context->>Operator: Return events
Operator->>Stream: Emit results
class MyAgent(Agent):
@action(InputEvent)
@staticmethod
def first_action(event: Event, ctx: RunnerContext):
input_data = event.input
processed = input_data + ' processed'
ctx.send_event(MyEvent(value=processed))
ctx.send_event(OutputEvent(output=processed))
@action(MyEvent)
@staticmethod
def second_action(event: Event, ctx: RunnerContext):
result = event.value + ' final'
ctx.send_event(OutputEvent(output=result))env = AgentsExecutionEnvironment.get_execution_environment()
agent = MyAgent()
input_list = [
{'key': 'user1', 'value': 'Hello'},
{'key': 'user2', 'value': 'World'}
]
output_list = env.from_list(input_list).apply(agent).to_list()
env.execute()- Agents react to events rather than polling
- Loose coupling between components
- Scalable and responsive architecture
- Actions are stateless functions
- Easy to test and reason about
- Supports both Java and Python implementations
- Separate compilation from execution
- Enables optimization and serialization
- Supports distributed deployment
- Provides execution context to actions
- Manages event sending and state access
- Abstracts runtime concerns from business logic
- Seamless Java-Python interoperability
- Unified event model across languages
- Shared serialization format
- Scalability: Built on Apache Flink's distributed architecture
- Real-time Processing: Low-latency event processing
- Fault Tolerance: Inherits Flink's fault tolerance mechanisms
- Flexibility: Support for both Java and Python
- Event-Driven: Reactive programming model
- Extensibility: Easy to add new event types and actions
Apache Flink Agents provides a robust, scalable, and flexible framework for building intelligent agent systems. Its event-driven architecture, multi-language support, and integration with Apache Flink make it suitable for a wide range of real-time AI applications, from simple data processing pipelines to complex multi-agent systems.
The framework's design emphasizes:
- Separation of concerns through layered architecture
- Flexibility through multi-language support
- Scalability through distributed execution
- Maintainability through clear abstractions and patterns
This architecture enables developers to focus on business logic while the framework handles the complexities of distributed execution, event management, and cross-language integration.