- Overview
- Architecture Components
- Checkpoint Lifecycle
- Barrier Mechanism
- State Snapshotting
- Recovery Process
- Configuration
- Performance Tuning
- Troubleshooting
Apache Flink's checkpointing mechanism provides fault tolerance by creating consistent snapshots of the application state at regular intervals. This enables the system to recover from failures while maintaining exactly-once processing guarantees.
- Checkpoint: An automatic snapshot of application state taken by Flink for fault tolerance
- Savepoint: A manual snapshot triggered by users for operational purposes
- State Backend: The storage mechanism for operator state during execution
- Checkpoint Storage: The persistent storage location for checkpoint data
- Barrier: Special markers that flow through the data stream to coordinate checkpoints
graph TB
subgraph "JobManager"
CC["Checkpoint Coordinator"]
CS["Checkpoint Storage"]
end
subgraph "TaskManager 1"
T1["Task 1"]
SB1["State Backend"]
SC1["Subtask Checkpoint Coordinator"]
end
subgraph "TaskManager 2"
T2["Task 2"]
SB2["State Backend"]
SC2["Subtask Checkpoint Coordinator"]
end
subgraph "External Storage"
FS["File System / S3 / HDFS"]
end
CC --> SC1
CC --> SC2
SC1 --> SB1
SC2 --> SB2
CS --> FS
style CC fill:#e1f5fe
style CS fill:#e8f5e8
style SB1 fill:#fff3e0
style SB2 fill:#fff3e0
-
Checkpoint Coordinator (JobManager):
- Triggers checkpoints at configured intervals
- Manages checkpoint lifecycle
- Coordinates barrier injection and acknowledgments
- Handles checkpoint completion and cleanup
-
Subtask Checkpoint Coordinator (TaskManager):
- Manages local checkpoint execution
- Coordinates state snapshotting
- Handles barrier alignment and processing
-
State Backend:
- Stores operator state during execution
- Provides snapshotting capabilities
- Manages state serialization/deserialization
-
Checkpoint Storage:
- Persists checkpoint metadata and data
- Provides recovery capabilities
- Manages checkpoint retention policies
sequenceDiagram
participant CC as Checkpoint Coordinator
participant S as Source
participant O as Operator
participant Sink as Sink
participant CS as Checkpoint Storage
Note over CC: 1. Trigger Checkpoint
CC->>S: Trigger Checkpoint (ID: n)
Note over S: 2. Inject Barriers
S->>S: Record offset position
S->>O: Checkpoint Barrier (n)
Note over O: 3. Barrier Processing
O->>O: Wait for barriers from all inputs
O->>O: Snapshot state
O->>Sink: Forward Barrier (n)
Note over Sink: 4. Acknowledgment
Sink->>Sink: Receive all barriers
Sink->>CC: Acknowledge Checkpoint (n)
Note over CC: 5. Completion
CC->>CS: Persist checkpoint metadata
CC->>CC: Mark checkpoint complete
-
Trigger Phase:
- Checkpoint Coordinator initiates checkpoint with unique ID
- Triggers sent to all source operators
- Checkpoint metadata created
-
Barrier Injection:
- Sources record their current position/offset
- Checkpoint barriers injected into data streams
- Barriers carry checkpoint ID and timestamp
-
Barrier Propagation:
- Barriers flow downstream with data records
- Never overtake records (maintain order)
- Multiple barriers can coexist in stream
-
State Snapshotting:
- Operators snapshot state when all input barriers received
- Asynchronous state serialization to storage
- Barriers forwarded to downstream operators
-
Acknowledgment:
- Sink operators acknowledge checkpoint completion
- Checkpoint Coordinator collects all acknowledgments
- Checkpoint marked as complete
graph LR
subgraph "Input Streams"
I1["Input 1: ...R1[B]R2..."]
I2["Input 2: ...R3[B]R4..."]
end
subgraph "Operator"
O["Operator<br/>Wait for barriers<br/>from all inputs"]
Buffer["Input Buffer"]
end
subgraph "Output"
Out["Output: ...R1,R3[B]R2,R4..."]
end
I1 --> Buffer
I2 --> Buffer
Buffer --> O
O --> Out
style O fill:#ffecb3
style Buffer fill:#e3f2fd
Alignment Process:
- Operator receives barrier from first input stream
- Blocks that input channel until barriers arrive from all inputs
- Buffers incoming records from blocked channels
- Once all barriers received, processes buffered records
- Takes state snapshot and forwards barriers
graph LR
subgraph "Input Streams"
I1["Input 1: ...R1[B]R2..."]
I2["Input 2: ...R3,R4..."]
end
subgraph "Operator"
O["Operator<br/>Process first barrier<br/>immediately"]
InFlight["In-flight Data<br/>becomes part of state"]
end
subgraph "Output"
Out["Output: ...R1[B]R3,R2..."]
end
I1 --> O
I2 --> O
O --> InFlight
O --> Out
style O fill:#c8e6c9
style InFlight fill:#fff3e0
Unaligned Process:
- Operator reacts to first barrier immediately
- Forwards barrier downstream without waiting
- In-flight data becomes part of operator state
- Reduces checkpoint alignment time
- Suitable for high-throughput scenarios
graph TB
subgraph "HashMapStateBackend"
HMS["Heap Memory Storage"]
HMSCP["Checkpoint to<br/>External Storage"]
end
subgraph "EmbeddedRocksDBStateBackend"
RDB["RocksDB<br/>Local Disk"]
RDBCP["Incremental<br/>Checkpoints"]
end
subgraph "External Storage"
FS["FileSystem<br/>S3/HDFS/GCS"]
Meta["Checkpoint<br/>Metadata"]
end
HMS --> HMSCP
RDB --> RDBCP
HMSCP --> FS
RDBCP --> FS
Meta --> FS
style HMS fill:#e1f5fe
style RDB fill:#fff3e0
style FS fill:#e8f5e8
-
Synchronous Phase:
- Operator stops processing new records
- Creates consistent state snapshot
- Minimal blocking time
-
Asynchronous Phase:
- State serialization to external storage
- Operator continues processing
- Background I/O operations
-
Completion:
- Snapshot persisted successfully
- Acknowledgment sent to Checkpoint Coordinator
- Old state versions eligible for cleanup
graph TB
subgraph "Failure Detection"
FD["Task Failure<br/>Detected"]
RS["Restart Strategy<br/>Evaluation"]
end
subgraph "Recovery Planning"
LC["Latest Checkpoint<br/>Selection"]
RD["Recovery Decision<br/>Made"]
end
subgraph "State Restoration"
SR["State Restoration<br/>from Checkpoint"]
SO["Source Offset<br/>Reset"]
end
subgraph "Resumption"
RP["Resume Processing<br/>from Checkpoint"]
EO["Exactly-Once<br/>Guarantees"]
end
FD --> RS
RS --> LC
LC --> RD
RD --> SR
SR --> SO
SO --> RP
RP --> EO
style FD fill:#ffcdd2
style LC fill:#e8f5e8
style SR fill:#fff3e0
style RP fill:#c8e6c9
-
Failure Detection:
- TaskManager failure or task exception
- Restart strategy evaluation
- Recovery decision made
-
Checkpoint Selection:
- Latest completed checkpoint identified
- Checkpoint metadata retrieved
- Recovery plan created
-
State Restoration:
- Operators restored with checkpoint state
- Source positions reset to checkpoint offsets
- Topology redeployed
-
Processing Resumption:
- Data processing resumes from checkpoint point
- Exactly-once guarantees maintained
- Downstream systems remain consistent
# Enable checkpointing
execution.checkpointing.interval: 60s
# Checkpoint mode
execution.checkpointing.mode: EXACTLY_ONCE
# Checkpoint timeout
execution.checkpointing.timeout: 10min
# Minimum pause between checkpoints
execution.checkpointing.min-pause: 5s
# Maximum concurrent checkpoints
execution.checkpointing.max-concurrent-checkpoints: 1
# Checkpoint cleanup
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# Unaligned checkpoints
execution.checkpointing.unaligned: false# State backend type
state.backend.type: rocksdb
# Checkpoint storage directory
state.checkpoints.dir: s3://my-bucket/checkpoints
# Savepoint directory
state.savepoints.dir: s3://my-bucket/savepoints
# Incremental checkpoints
state.backend.incremental: trueStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing every 60 seconds
env.enableCheckpointing(60000);
// Configure checkpoint mode
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Set minimum pause between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
// Set checkpoint timeout
env.getCheckpointConfig().setCheckpointTimeout(600000);
// Allow only one checkpoint at a time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Retain checkpoints on cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// Enable unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
// Set state backend
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
// Set checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints");graph LR
subgraph "Factors"
RT["Recovery Time"]
OH["Overhead"]
SS["State Size"]
end
subgraph "Trade-offs"
FI["Frequent Intervals<br/>• Lower recovery time<br/>• Higher overhead"]
II["Infrequent Intervals<br/>• Higher recovery time<br/>• Lower overhead"]
end
subgraph "Optimization"
OI["Optimal Interval<br/>Balance based on<br/>requirements"]
end
RT --> FI
OH --> II
SS --> OI
FI --> OI
II --> OI
style OI fill:#c8e6c9
-
Checkpoint Interval:
- Balance recovery time vs. overhead
- Consider state size and I/O capacity
- Monitor checkpoint duration
-
State Backend Selection:
- HashMapStateBackend for small state
- RocksDBStateBackend for large state
- Enable incremental checkpoints for RocksDB
-
Unaligned Checkpoints:
- Use for high-throughput scenarios
- Avoid when I/O is bottleneck
- Monitor checkpoint sizes
-
Storage Optimization:
- Use high-performance storage (SSD)
- Ensure sufficient I/O bandwidth
- Configure appropriate replication
# Key metrics to monitor
checkpoint.duration: "Time to complete checkpoint"
checkpoint.size: "Size of checkpoint data"
checkpoint.alignment_time: "Time waiting for barriers"
checkpoint.count: "Number of completed checkpoints"
checkpoint.failed_count: "Number of failed checkpoints"-
Checkpoint Timeouts:
- Increase checkpoint timeout
- Reduce checkpoint interval
- Optimize state backend performance
-
High Checkpoint Duration:
- Enable incremental checkpoints
- Optimize serialization
- Increase parallelism
-
Barrier Alignment Issues:
- Enable unaligned checkpoints
- Address backpressure
- Optimize network configuration
-
Storage Issues:
- Ensure sufficient storage capacity
- Verify storage accessibility
- Monitor I/O performance
-
Check Checkpoint Metrics:
# Via REST API curl http://jobmanager:8081/jobs/<job-id>/checkpoints # Via Web UI http://jobmanager:8081/#/job/<job-id>/checkpoints
-
Analyze Logs:
# TaskManager logs grep -i checkpoint taskmanager.log # JobManager logs grep -i checkpoint jobmanager.log
-
Monitor Resource Usage:
- CPU utilization during checkpoints
- Memory usage patterns
- Disk I/O during snapshots
- Network bandwidth utilization
-
Checkpoint Corruption:
- Verify storage integrity
- Check for concurrent modifications
- Restore from earlier checkpoint
-
Incompatible State:
- Review state schema changes
- Use state migration strategies
- Consider savepoint compatibility
-
Recovery Failures:
- Check resource availability
- Verify checkpoint accessibility
- Review error logs for root cause
-
Configuration:
- Set appropriate checkpoint intervals
- Choose suitable state backend
- Configure external checkpoint storage
-
Monitoring:
- Track checkpoint metrics
- Monitor storage performance
- Set up alerting for failures
-
Testing:
- Test recovery scenarios
- Validate checkpoint compatibility
- Perform chaos engineering
-
Operations:
- Implement backup strategies
- Document recovery procedures
- Regular checkpoint cleanup
This comprehensive guide covers the end-to-end checkpointing mechanism in Apache Flink, from basic concepts to advanced troubleshooting. The diagrams and examples provide practical insights for implementing and maintaining fault-tolerant Flink applications.