Last active
November 28, 2023 14:37
-
-
Save mrsimpson/70fccac3d52bebbe319c6d488d4db949 to your computer and use it in GitHub Desktop.
Buffered sorter for a window in Flink
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.sql.Timestamp; | |
import java.util.Comparator; | |
import java.util.Optional; | |
import java.util.stream.Collectors; | |
import java.util.stream.Stream; | |
import java.util.stream.StreamSupport; | |
import org.apache.flink.api.common.state.ListState; | |
import org.apache.flink.api.common.state.ListStateDescriptor; | |
import org.apache.flink.api.common.typeinfo.TypeInformation; | |
import org.apache.flink.configuration.Configuration; | |
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; | |
import org.apache.flink.util.Collector; | |
/** | |
* Re-orders a stream for a particular timeframe. By introducing a latency which is defined by the | |
* window for which this function is called, the function allows to re-order elements. It does this | |
* by preserving the previous window's content in flink state and emitting the objects of the | |
* current window along with those of the previous window which are lower or equal the highest | |
* timestamp . | |
* | |
* @param <T> Type of the objects to sort | |
* @param <K> Type of the key for which the window is constructed | |
*/ | |
public abstract class BufferedSorter<T, K> extends ProcessWindowFunction<T, T, K, TimeWindow> { | |
private transient ListState<T> previousWindowElementsState; | |
@Override | |
public void open(Configuration parameters) { | |
ListStateDescriptor<T> listStateDescriptor = | |
new ListStateDescriptor<T>("previousWindowElements", getTypeInfo()); | |
previousWindowElementsState = getRuntimeContext().getListState(listStateDescriptor); | |
} | |
protected abstract TypeInformation<T> getTypeInfo(); | |
@Override | |
public void process( | |
K key, BufferedSorter<T, K>.Context context, Iterable<T> elements, Collector<T> out) | |
throws Exception { | |
Iterable<T> previousWindowElements = previousWindowElementsState.get(); | |
if (!previousWindowElements.iterator().hasNext()) { | |
previousWindowElementsState.update( | |
StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList())); | |
} else { | |
Optional<T> latest = | |
StreamSupport.stream(previousWindowElements.spliterator(), false) | |
.max(Comparator.comparingLong(e -> getTime(e).getTime())); | |
if (latest.isPresent()) { | |
Stream.concat( | |
StreamSupport.stream(previousWindowElements.spliterator(), false), | |
StreamSupport.stream(elements.spliterator(), false) | |
.filter( | |
item -> | |
getTime(item).before(getTime(latest.get())) | |
|| getTime(item).equals(getTime(latest.get())))) | |
.sorted(Comparator.comparing(this::getTime)) | |
.forEachOrdered(out::collect); | |
previousWindowElementsState.update( | |
StreamSupport.stream(elements.spliterator(), false) | |
.filter(item -> getTime(item).after(getTime(latest.get()))) | |
.collect(Collectors.toList())); | |
} | |
} | |
} | |
protected abstract Timestamp getTime(T element); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment