Skip to content

Instantly share code, notes, and snippets.

@mrsimpson
Last active November 28, 2023 14:37
Show Gist options
  • Save mrsimpson/70fccac3d52bebbe319c6d488d4db949 to your computer and use it in GitHub Desktop.
Save mrsimpson/70fccac3d52bebbe319c6d488d4db949 to your computer and use it in GitHub Desktop.
Buffered sorter for a window in Flink
import java.sql.Timestamp;
import java.util.Comparator;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* Re-orders a stream for a particular timeframe. By introducing a latency which is defined by the
* window for which this function is called, the function allows to re-order elements. It does this
* by preserving the previous window's content in flink state and emitting the objects of the
* current window along with those of the previous window which are lower or equal the highest
* timestamp .
*
* @param <T> Type of the objects to sort
* @param <K> Type of the key for which the window is constructed
*/
public abstract class BufferedSorter<T, K> extends ProcessWindowFunction<T, T, K, TimeWindow> {
private transient ListState<T> previousWindowElementsState;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<T> listStateDescriptor =
new ListStateDescriptor<T>("previousWindowElements", getTypeInfo());
previousWindowElementsState = getRuntimeContext().getListState(listStateDescriptor);
}
protected abstract TypeInformation<T> getTypeInfo();
@Override
public void process(
K key, BufferedSorter<T, K>.Context context, Iterable<T> elements, Collector<T> out)
throws Exception {
Iterable<T> previousWindowElements = previousWindowElementsState.get();
if (!previousWindowElements.iterator().hasNext()) {
previousWindowElementsState.update(
StreamSupport.stream(elements.spliterator(), false).collect(Collectors.toList()));
} else {
Optional<T> latest =
StreamSupport.stream(previousWindowElements.spliterator(), false)
.max(Comparator.comparingLong(e -> getTime(e).getTime()));
if (latest.isPresent()) {
Stream.concat(
StreamSupport.stream(previousWindowElements.spliterator(), false),
StreamSupport.stream(elements.spliterator(), false)
.filter(
item ->
getTime(item).before(getTime(latest.get()))
|| getTime(item).equals(getTime(latest.get()))))
.sorted(Comparator.comparing(this::getTime))
.forEachOrdered(out::collect);
previousWindowElementsState.update(
StreamSupport.stream(elements.spliterator(), false)
.filter(item -> getTime(item).after(getTime(latest.get())))
.collect(Collectors.toList()));
}
}
}
protected abstract Timestamp getTime(T element);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment