DefaultDelegationTokenManager periodically renews delegation tokens (Kerberos, HDFS, etc.) to keep
long-running Flink jobs alive. If the renewal task is delayed too long, tokens expire and jobs fail.
DelegationTokenManager.start()
└─ startTokensUpdate() ← runs on cluster-io thread
├─ obtainDelegationTokensAndGetNextRenewal() ← blocking network I/O (KDC, NameNode)
├─ listener.onNewTokensObtained()
└─ scheduledExecutor.schedule(
() -> ioExecutor.execute(this::startTokensUpdate), ← two-hop chain
renewalDelay,
MILLISECONDS
)
Two executors are involved:
| Executor | Type | Role |
|---|---|---|
scheduledExecutor |
Pekko actor-system scheduler (adaptive ForkJoinPool) | fires the timer reliably |
ioExecutor |
Executors.newFixedThreadPool(4 × CPU_CORES) |
actually runs startTokensUpdate |
ClusterEntrypoint.java:388-397:
ioExecutor = Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration), // = 4 * availableProcessors()
new ExecutorThreadFactory("cluster-io"));
delegationTokenManager = DefaultDelegationTokenManagerFactory.create(
configuration, pluginManager,
commonRpcService.getScheduledExecutor(), // scheduledExecutor
ioExecutor); // shared poolClusterEntrypointUtils.java:114-118:
config.get(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE, 4 * Hardware.getNumberCPUCores());Hardware.getNumberCPUCores() is Runtime.getRuntime().availableProcessors() — in a container
this reflects the container CPU quota, not the host core count.
Example: JM configured with cpu: 3
availableProcessors() = 3
cluster-io pool size = 4 × 3 = 12 threads
Executors.newFixedThreadPool() uses an unbounded LinkedBlockingQueue:
- Submissions never block, never throw
RejectedExecutionExceptiondue to load - Tasks queue silently and wait for a free thread
The exact same ioExecutor instance is passed to:
| Consumer | Operation |
|---|---|
haServices |
HA filesystem reads/writes (ZooKeeper, HDFS) |
NettyShuffleEnvironment |
Partition release (file I/O) |
Hybrid shuffle (HsFileDataManager) |
Async data flush and spill to disk |
Sort-merge shuffle (SortMergeResultPartitionReadScheduler) |
Partition read scheduling |
RocksDB (CompactionScheduler) |
SST compaction (disk I/O + CPU) |
DefaultDelegationTokenManager |
Token renewal (network I/O) |
The 4× multiplier in 4 * availableProcessors() is the structural proof:
- For CPU-bound work, the optimal pool size equals core count — more threads only add context-switch overhead.
- For I/O-bound work, threads spend most of their time blocked inside kernel syscalls (
read,write,fsync, socket wait). The OS scheduler marks them as non-runnable; they consume no CPU cycles while waiting. - The
4×factor explicitly accounts for this: it creates enough slots so many blocking I/O operations can be in-flight simultaneously.
The config description confirms it (ClusterOptions.java:81):
"The size of the IO executor pool used by the cluster to execute blocking IO operations"
cpu: 3 → pool size: 12 cluster-io threads
Core 0: [thread 1 - shuffle spill write ] ← runnable, consuming CPU
Core 1: [thread 2 - shuffle spill write ] ← runnable, consuming CPU
Core 2: [thread 3 - HA ZooKeeper write ] ← runnable, consuming CPU
Thread 4: blocked in kernel — waiting for disk write (shuffle)
Thread 5: blocked in kernel — waiting for disk write (shuffle)
Thread 6: blocked in kernel — waiting for disk write (shuffle)
Thread 7: blocked in kernel — waiting for disk write (shuffle)
Thread 8: blocked in kernel — waiting for RocksDB compaction I/O
Thread 9: blocked in kernel — waiting for RocksDB compaction I/O
Thread 10: blocked in kernel — waiting for RocksDB compaction I/O
Thread 11: blocked in kernel — waiting for HA filesystem op
Thread 12: blocked in kernel — waiting for HA filesystem op
LinkedBlockingQueue: [ startTokensUpdate ] ← no thread free, waits here silently
The scheduled timer fires on time. ioExecutor.execute(this::startTokensUpdate) succeeds
immediately — the task is enqueued. But it sits in the queue until one of the 12 blocking
operations completes. Under heavy shuffle or compaction load this can take tens of minutes.
No error is logged. No warning is emitted. No retry is triggered.
Give the token manager its own single-thread executor, completely isolated from cluster I/O load.
ExecutorService tokenRenewalExecutor = Executors.newSingleThreadExecutor(
new ExecutorThreadFactory("delegation-token-renewal"));
delegationTokenManager = DefaultDelegationTokenManagerFactory.create(
configuration, pluginManager,
commonRpcService.getScheduledExecutor(),
tokenRenewalExecutor);Rationale: Token renewal is lightweight (one network round-trip per provider) and latency-sensitive. It has no business sharing a pool with multi-minute shuffle spills. A dedicated thread costs negligible memory and eliminates the starvation entirely regardless of cluster load.
# flink-conf.yaml
cluster.io-pool.size: 32Rationale: Decouples pool size from the container's CPU quota. More thread slots mean more concurrent blocking I/O operations before the pool is fully occupied. Does not eliminate contention — under high enough load the same starvation recurs with a higher threshold.
When to use: When a code change is not immediately possible. Treat as a temporary workaround.
Raise the JM CPU quota, e.g. from cpu: 3 to cpu: 6.
Rationale: Hardware.getNumberCPUCores() calls Runtime.getRuntime().availableProcessors(),
which in a container reflects the CPU quota. More CPU → higher availableProcessors() → larger
pool automatically:
cpu: 3 → availableProcessors() = 3 → pool = 12 threads
cpu: 6 → availableProcessors() = 6 → pool = 24 threads
This is mechanically equivalent to option 2 (cluster.io-pool.size) but achieved through
infrastructure rather than config. The same caveat applies: the blocked threads are parked on I/O
and consume no CPU, so the extra cores themselves do not speed up I/O operations — they only
increase the number of thread slots. Under high enough load, starvation recurs at a higher threshold.
When to use: When the infrastructure team controls resource allocation and a config change to Flink is not desired. Still a workaround, not a fix.
| Dimension | Detail |
|---|---|
| Root cause | Fixed-size cluster-io pool (4 × CPU_CORES) shared with long-running blocking I/O tasks |
| Why CPU count is misleading | Blocked threads consume no CPU — thread slots, not cores, are the scarce resource |
With cpu: 3 |
Pool = 12 threads; easily saturated under shuffle/compaction load |
| Failure mode | Silent queue wait — no log, no error, no retry until a thread frees |
| Config mitigation | cluster.io-pool.size: 32 — raises the threshold, does not eliminate contention |
| Infra mitigation | Increase container CPU quota — same effect as above, via infrastructure |
| Real fix | Dedicated single-thread executor for token renewal |