Skip to content

Instantly share code, notes, and snippets.

@coderplay
Last active July 10, 2025 22:02
Show Gist options
  • Select an option

  • Save coderplay/0dbda1e39545a134081653b80a442d62 to your computer and use it in GitHub Desktop.

Select an option

Save coderplay/0dbda1e39545a134081653b80a442d62 to your computer and use it in GitHub Desktop.
Flink State Research

Flink State Types

Flink supports 8 main state types organized in a hierarchy. Here's the complete list with detailed explanations:

1. Keyed State Types (for KeyedStream)

Core State Types:

1. ValueState<T>

  • Stores a single value per key
  • Operations: value(), update(T), clear()

2. ListState<T>

  • Stores a list of elements per key
  • Operations: get()Iterable<T>, add(T), addAll(List<T>), update(List<T>), clear()
  • More efficient than manually maintaining a list in ValueState

3. MapState<UK, UV>

  • Stores key-value mappings per key
  • Operations: get(UK), put(UK, UV), putAll(Map<UK, UV>), remove(UK), contains(UK), entries(), keys(), values(), isEmpty(), iterator(), clear()

Aggregating State Types:

4. ReducingState<T>

  • Stores a single aggregated value using a ReduceFunction
  • Extends MergingState<T, T>
  • Operations: get(), add(T), clear()
  • Example: Sum, max, min operations

5. AggregatingState<IN, OUT>

  • Stores a single aggregated value using an AggregateFunction
  • Extends MergingState<IN, OUT>
  • Allows different input and output types (unlike ReducingState)
  • Operations: get(), add(IN), clear()

2. Operator State Types (for non-keyed streams)

6. BroadcastState<K, V>

  • Stores broadcast state that's replicated across all operator instances
  • Critical: All task instances must store the same elements
  • Operations: put(K, V), putAll(Map<K, V>), remove(K), get(K), contains(K), entries(), iterator(), clear()

7. ReadOnlyBroadcastState<K, V>

  • Read-only view of BroadcastState
  • Operations: get(K), contains(K), entries(), iterator()

3. Special List State for Operators

8. Operator ListState<T> (from OperatorStateStore)

  • Different from keyed ListState
  • Two variants:
    • Regular ListState: Round-robin redistribution on rescaling
    • Union ListState: Broadcast redistribution (all operators get all elements)

State Hierarchy

State (base interface)
├── ValueState<T>
├── AppendingState<IN, OUT>
│   ├── ListState<T> (extends MergingState)
│   └── MergingState<IN, OUT>
│       ├── ReducingState<T>
│       └── AggregatingState<IN, OUT>
├── MapState<UK, UV>
├── ReadOnlyBroadcastState<K, V>
└── BroadcastState<K, V> (extends ReadOnlyBroadcastState)

Usage Examples

// In a RichFunction's open() method
public void open(Configuration parameters) {
    // 1. ValueState
    ValueStateDescriptor<Long> valueDesc = new ValueStateDescriptor<>("count", Long.class, 0L);
    ValueState<Long> countState = getRuntimeContext().getState(valueDesc);
    
    // 2. ListState  
    ListStateDescriptor<String> listDesc = new ListStateDescriptor<>("events", String.class);
    ListState<String> eventState = getRuntimeContext().getListState(listDesc);
    
    // 3. MapState
    MapStateDescriptor<String, Integer> mapDesc = new MapStateDescriptor<>("userCounts", String.class, Integer.class);
    MapState<String, Integer> userState = getRuntimeContext().getMapState(mapDesc);
    
    // 4. ReducingState
    ReducingStateDescriptor<Long> reducingDesc = new ReducingStateDescriptor<>("sum", Long::sum, Long.class);
    ReducingState<Long> sumState = getRuntimeContext().getReducingState(reducingDesc);
    
    // 5. AggregatingState
    AggregatingStateDescriptor<Long, Double, Double> aggDesc = new AggregatingStateDescriptor<>(
        "average", new AverageAggregateFunction(), Double.class);
    AggregatingState<Long, Double> avgState = getRuntimeContext().getAggregatingState(aggDesc);
}

// In operator state (for BroadcastState)
BroadcastState<String, Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);

Key Differences

  1. Keyed vs Operator State:

    • Keyed states are partitioned by key (ValueState, ListState, MapState, ReducingState, AggregatingState)
    • Operator states are per-operator instance (BroadcastState, Operator ListState)
  2. Aggregating vs Non-Aggregating:

    • ReducingState/AggregatingState automatically aggregate values
    • Others store raw values
  3. Redistribution Behavior:

    • Keyed states: Redistributed by key
    • Operator ListState: Round-robin or union redistribution
    • BroadcastState: Full replication to all instances

All state types support checkpointing, TTL (Time-To-Live), and query capabilities when configured appropriately.

Physical Layout of MapState Key-Value Pairs in RocksDBStateBackend

Key Structure in RocksDB

For MapState, each map entry (user key-value pair) is stored as a separate RocksDB key-value pair. The RocksDB key has a composite structure with four components:

RocksDB Key = [KeyGroup] + [CurrentKey] + [Namespace] + [UserKey]

1. KeyGroup (Variable Size: 1-4 bytes)

  • Purpose: Identifies which key group this entry belongs to (for parallelism/sharding)
  • Size: keyGroupPrefixBytes (computed based on numberOfKeyGroups)
  • Format: Big-endian integer representation
  • Example: For 128 key groups → 1 byte, for 65536 key groups → 2 bytes

2. CurrentKey (Variable Size)

  • Purpose: The actual key from setCurrentKey(newKey) - identifies the logical partition
  • Serialization: Uses the key serializer (TypeSerializer<K>)
  • Example: If key is String "user123" → serialized string bytes

3. Namespace (Variable Size)

  • Purpose: Provides isolation within the same key (e.g., different windows)
  • Serialization: Uses namespace serializer (TypeSerializer<N>)
  • Default: VoidNamespace.INSTANCE for non-windowed operations

4. UserKey (Variable Size)

  • Purpose: The map key from mapState.put(userKey, userValue)
  • Serialization: Uses user key serializer (TypeSerializer<UK>)
  • Example: If user key is String "sessionId" → serialized string bytes

Value Structure in RocksDB

The RocksDB value contains the serialized user value with null handling:

RocksDB Value = [NullFlag (1 byte)] + [SerializedUserValue]

1. Null Flag (1 byte)

  • Purpose: Indicates whether the user value is null
  • Format: Boolean (1 byte)
  • Values: true if null, false if not null

2. Serialized User Value (Variable Size)

  • Purpose: The actual map value from mapState.put(userKey, userValue)
  • Serialization: Uses user value serializer (TypeSerializer<UV>)
  • Note: Only present if null flag is false

Physical Storage Example

Let's say we have:

  • Current Key: "user123" (String)
  • Namespace: VoidNamespace.INSTANCE
  • User Key: "sessionId" (String)
  • User Value: 42L (Long)
  • Key Group: 5 (1 byte)

RocksDB Key Layout:

[0x05] + [serialized("user123")] + [serialized(VoidNamespace)] + [serialized("sessionId")]

RocksDB Value Layout:

[0x00] + [serialized(42L)]

(0x00 = false, indicating non-null value)

Code Implementation Details

Key Construction (from RocksDBMapState.java)

// Line 109-110 in RocksDBMapState.java
byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);

// From AbstractRocksDBState.java line 147-150
<UK> byte[] serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
        UK userKey, TypeSerializer<UK> userKeySerializer) throws IOException {
    return sharedKeyNamespaceSerializer.buildCompositeKeyNamesSpaceUserKey(
            currentNamespace, namespaceSerializer, userKey, userKeySerializer);
}

Value Construction (from AbstractRocksDBState.java)

// Line 175-179
<T> byte[] serializeValueNullSensitive(T value, TypeSerializer<T> serializer) throws IOException {
    dataOutputView.clear();
    dataOutputView.writeBoolean(value == null);  // Null flag
    return serializeValueInternal(value, serializer);  // Actual value
}

Column Family Organization

  • Each MapState gets its own RocksDB Column Family
  • Column Family Name: Based on the state descriptor name
  • Isolation: Different states don't interfere with each other
  • Performance: Allows independent configuration and compaction

Key Benefits of This Layout

  1. Efficient Individual Access: Each map entry can be accessed directly without reading the entire map
  2. Partial Updates: Can update single map entries without rewriting the whole map
  3. Range Scans: Can iterate over all entries for a given key using prefix scans
  4. Memory Efficiency: Large maps don't need to fit entirely in memory
  5. Checkpointing: Each entry can be checkpointed independently

Storage Efficiency Considerations

  • Key Overhead: Each map entry has the full composite key overhead
  • Small Values: May have significant overhead for small user values
  • Large Maps: Very efficient as entries are stored separately
  • Iteration: Uses RocksDB prefix iteration for efficient map traversal

This layout enables Flink to handle very large MapState instances efficiently while providing fast individual key access and updates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment