Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save gaborgsomogyi/63ae46aa8ac42a8c8731e23103d9a100 to your computer and use it in GitHub Desktop.

Select an option

Save gaborgsomogyi/63ae46aa8ac42a8c8731e23103d9a100 to your computer and use it in GitHub Desktop.

Delegation Token Renewal Starvation on Shared cluster-io Thread Pool

Context

DefaultDelegationTokenManager periodically renews delegation tokens (Kerberos, HDFS, etc.) to keep long-running Flink jobs alive. If the renewal task is delayed too long, tokens expire and jobs fail.


How the Renewal Loop Works

DelegationTokenManager.start()
  └─ startTokensUpdate()                          ← runs on cluster-io thread
       ├─ obtainDelegationTokensAndGetNextRenewal()  ← blocking network I/O (KDC, NameNode)
       ├─ listener.onNewTokensObtained()
       └─ scheduledExecutor.schedule(
              () -> ioExecutor.execute(this::startTokensUpdate),  ← two-hop chain
              renewalDelay,
              MILLISECONDS
          )

Two executors are involved:

Executor Type Role
scheduledExecutor Pekko actor-system scheduler (adaptive ForkJoinPool) fires the timer reliably
ioExecutor Executors.newFixedThreadPool(4 × CPU_CORES) actually runs startTokensUpdate

ClusterEntrypoint.java:388-397:

ioExecutor = Executors.newFixedThreadPool(
        ClusterEntrypointUtils.getPoolSize(configuration),   // = 4 * availableProcessors()
        new ExecutorThreadFactory("cluster-io"));

delegationTokenManager = DefaultDelegationTokenManagerFactory.create(
        configuration, pluginManager,
        commonRpcService.getScheduledExecutor(),   // scheduledExecutor
        ioExecutor);                               // shared pool

The Thread Pool

ClusterEntrypointUtils.java:114-118:

config.get(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE, 4 * Hardware.getNumberCPUCores());

Hardware.getNumberCPUCores() is Runtime.getRuntime().availableProcessors() — in a container this reflects the container CPU quota, not the host core count.

Example: JM configured with cpu: 3

availableProcessors() = 3
cluster-io pool size  = 4 × 3 = 12 threads

Executors.newFixedThreadPool() uses an unbounded LinkedBlockingQueue:

  • Submissions never block, never throw RejectedExecutionException due to load
  • Tasks queue silently and wait for a free thread

What Shares the Same Pool

The exact same ioExecutor instance is passed to:

Consumer Operation
haServices HA filesystem reads/writes (ZooKeeper, HDFS)
NettyShuffleEnvironment Partition release (file I/O)
Hybrid shuffle (HsFileDataManager) Async data flush and spill to disk
Sort-merge shuffle (SortMergeResultPartitionReadScheduler) Partition read scheduling
RocksDB (CompactionScheduler) SST compaction (disk I/O + CPU)
DefaultDelegationTokenManager Token renewal (network I/O)

Why These Are I/O-Bound (Threads Park, Not Spin)

The multiplier in 4 * availableProcessors() is the structural proof:

  • For CPU-bound work, the optimal pool size equals core count — more threads only add context-switch overhead.
  • For I/O-bound work, threads spend most of their time blocked inside kernel syscalls (read, write, fsync, socket wait). The OS scheduler marks them as non-runnable; they consume no CPU cycles while waiting.
  • The factor explicitly accounts for this: it creates enough slots so many blocking I/O operations can be in-flight simultaneously.

The config description confirms it (ClusterOptions.java:81):

"The size of the IO executor pool used by the cluster to execute blocking IO operations"


The Starvation Scenario

cpu: 3  →  pool size: 12 cluster-io threads

Core 0: [thread 1  - shuffle spill write   ] ← runnable, consuming CPU
Core 1: [thread 2  - shuffle spill write   ] ← runnable, consuming CPU
Core 2: [thread 3  - HA ZooKeeper write    ] ← runnable, consuming CPU

Thread  4: blocked in kernel — waiting for disk write (shuffle)
Thread  5: blocked in kernel — waiting for disk write (shuffle)
Thread  6: blocked in kernel — waiting for disk write (shuffle)
Thread  7: blocked in kernel — waiting for disk write (shuffle)
Thread  8: blocked in kernel — waiting for RocksDB compaction I/O
Thread  9: blocked in kernel — waiting for RocksDB compaction I/O
Thread 10: blocked in kernel — waiting for RocksDB compaction I/O
Thread 11: blocked in kernel — waiting for HA filesystem op
Thread 12: blocked in kernel — waiting for HA filesystem op

LinkedBlockingQueue: [ startTokensUpdate ] ← no thread free, waits here silently

The scheduled timer fires on time. ioExecutor.execute(this::startTokensUpdate) succeeds immediately — the task is enqueued. But it sits in the queue until one of the 12 blocking operations completes. Under heavy shuffle or compaction load this can take tens of minutes.

No error is logged. No warning is emitted. No retry is triggered.


Possible Solutions

1. Dedicated executor for token renewal (code change — proper fix)

Give the token manager its own single-thread executor, completely isolated from cluster I/O load.

ExecutorService tokenRenewalExecutor = Executors.newSingleThreadExecutor(
        new ExecutorThreadFactory("delegation-token-renewal"));

delegationTokenManager = DefaultDelegationTokenManagerFactory.create(
        configuration, pluginManager,
        commonRpcService.getScheduledExecutor(),
        tokenRenewalExecutor);

Rationale: Token renewal is lightweight (one network round-trip per provider) and latency-sensitive. It has no business sharing a pool with multi-minute shuffle spills. A dedicated thread costs negligible memory and eliminates the starvation entirely regardless of cluster load.


2. Increase pool size via config (mitigation — config-only)

# flink-conf.yaml
cluster.io-pool.size: 32

Rationale: Decouples pool size from the container's CPU quota. More thread slots mean more concurrent blocking I/O operations before the pool is fully occupied. Does not eliminate contention — under high enough load the same starvation recurs with a higher threshold.

When to use: When a code change is not immediately possible. Treat as a temporary workaround.


3. Increase container CPU allocation (mitigation — infrastructure-only)

Raise the JM CPU quota, e.g. from cpu: 3 to cpu: 6.

Rationale: Hardware.getNumberCPUCores() calls Runtime.getRuntime().availableProcessors(), which in a container reflects the CPU quota. More CPU → higher availableProcessors() → larger pool automatically:

cpu: 3  →  availableProcessors() = 3  →  pool = 12 threads
cpu: 6  →  availableProcessors() = 6  →  pool = 24 threads

This is mechanically equivalent to option 2 (cluster.io-pool.size) but achieved through infrastructure rather than config. The same caveat applies: the blocked threads are parked on I/O and consume no CPU, so the extra cores themselves do not speed up I/O operations — they only increase the number of thread slots. Under high enough load, starvation recurs at a higher threshold.

When to use: When the infrastructure team controls resource allocation and a config change to Flink is not desired. Still a workaround, not a fix.


Summary

Dimension Detail
Root cause Fixed-size cluster-io pool (4 × CPU_CORES) shared with long-running blocking I/O tasks
Why CPU count is misleading Blocked threads consume no CPU — thread slots, not cores, are the scarce resource
With cpu: 3 Pool = 12 threads; easily saturated under shuffle/compaction load
Failure mode Silent queue wait — no log, no error, no retry until a thread frees
Config mitigation cluster.io-pool.size: 32 — raises the threshold, does not eliminate contention
Infra mitigation Increase container CPU quota — same effect as above, via infrastructure
Real fix Dedicated single-thread executor for token renewal
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment