Flink supports 8 main state types organized in a hierarchy. Here's the complete list with detailed explanations:
1. ValueState<T>
- Stores a single value per key
- Operations:
value(),update(T),clear()
2. ListState<T>
- Stores a list of elements per key
- Operations:
get()→Iterable<T>,add(T),addAll(List<T>),update(List<T>),clear() - More efficient than manually maintaining a list in ValueState
3. MapState<UK, UV>
- Stores key-value mappings per key
- Operations:
get(UK),put(UK, UV),putAll(Map<UK, UV>),remove(UK),contains(UK),entries(),keys(),values(),isEmpty(),iterator(),clear()
4. ReducingState<T>
- Stores a single aggregated value using a
ReduceFunction - Extends
MergingState<T, T> - Operations:
get(),add(T),clear() - Example: Sum, max, min operations
5. AggregatingState<IN, OUT>
- Stores a single aggregated value using an
AggregateFunction - Extends
MergingState<IN, OUT> - Allows different input and output types (unlike ReducingState)
- Operations:
get(),add(IN),clear()
6. BroadcastState<K, V>
- Stores broadcast state that's replicated across all operator instances
- Critical: All task instances must store the same elements
- Operations:
put(K, V),putAll(Map<K, V>),remove(K),get(K),contains(K),entries(),iterator(),clear()
7. ReadOnlyBroadcastState<K, V>
- Read-only view of BroadcastState
- Operations:
get(K),contains(K),entries(),iterator()
8. Operator ListState<T> (from OperatorStateStore)
- Different from keyed ListState
- Two variants:
- Regular ListState: Round-robin redistribution on rescaling
- Union ListState: Broadcast redistribution (all operators get all elements)
State (base interface)
├── ValueState<T>
├── AppendingState<IN, OUT>
│ ├── ListState<T> (extends MergingState)
│ └── MergingState<IN, OUT>
│ ├── ReducingState<T>
│ └── AggregatingState<IN, OUT>
├── MapState<UK, UV>
├── ReadOnlyBroadcastState<K, V>
└── BroadcastState<K, V> (extends ReadOnlyBroadcastState)
// In a RichFunction's open() method
public void open(Configuration parameters) {
// 1. ValueState
ValueStateDescriptor<Long> valueDesc = new ValueStateDescriptor<>("count", Long.class, 0L);
ValueState<Long> countState = getRuntimeContext().getState(valueDesc);
// 2. ListState
ListStateDescriptor<String> listDesc = new ListStateDescriptor<>("events", String.class);
ListState<String> eventState = getRuntimeContext().getListState(listDesc);
// 3. MapState
MapStateDescriptor<String, Integer> mapDesc = new MapStateDescriptor<>("userCounts", String.class, Integer.class);
MapState<String, Integer> userState = getRuntimeContext().getMapState(mapDesc);
// 4. ReducingState
ReducingStateDescriptor<Long> reducingDesc = new ReducingStateDescriptor<>("sum", Long::sum, Long.class);
ReducingState<Long> sumState = getRuntimeContext().getReducingState(reducingDesc);
// 5. AggregatingState
AggregatingStateDescriptor<Long, Double, Double> aggDesc = new AggregatingStateDescriptor<>(
"average", new AverageAggregateFunction(), Double.class);
AggregatingState<Long, Double> avgState = getRuntimeContext().getAggregatingState(aggDesc);
}
// In operator state (for BroadcastState)
BroadcastState<String, Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);-
Keyed vs Operator State:
- Keyed states are partitioned by key (ValueState, ListState, MapState, ReducingState, AggregatingState)
- Operator states are per-operator instance (BroadcastState, Operator ListState)
-
Aggregating vs Non-Aggregating:
- ReducingState/AggregatingState automatically aggregate values
- Others store raw values
-
Redistribution Behavior:
- Keyed states: Redistributed by key
- Operator ListState: Round-robin or union redistribution
- BroadcastState: Full replication to all instances
All state types support checkpointing, TTL (Time-To-Live), and query capabilities when configured appropriately.