Skip to content

Instantly share code, notes, and snippets.

@ktoso
Last active January 25, 2017 20:54
Show Gist options
  • Save ktoso/939cf091a09b7e7cdbff4090b09755b7 to your computer and use it in GitHub Desktop.
Save ktoso/939cf091a09b7e7cdbff4090b09755b7 to your computer and use it in GitHub Desktop.
/**
* Groups items within a given time interval (unless the max size is reached, then earlier),
* and picks the single element to signal downstream using the provided `pick` function.
*/
public <A> Flow<A, A, NotUsed> debounceSelect(FiniteDuration interval, Function<List<A>, A> pick, int max) {
return Flow.<A>create().groupedWithin(max, interval).map(group -> pick.apply(group));
}
class RefreshSignal {
public RefreshSignal(Object a) {}
}
final SourceQueueWithComplete<RefreshSignal> queue =
Source.<RefreshSignal>queue(4, OverflowStrategy.dropHead()) // give it some buffer space, but we can drop signals anyway, since if we have at least one, it means we want to trigger one refresh
.via(debounceSelect(FiniteDuration.create(1, TimeUnit.SECONDS), it -> it.get(0), 100)) // picking any of the refresh signals
.to(Sink.actorRef(target, new RefreshSignal("done")))
.run(materializer); // your Materializer
final CompletionStage<QueueOfferResult> offer1 = queue.offer(new RefreshSignal("refresh"));
final CompletionStage<QueueOfferResult> offer2 = queue.offer(new RefreshSignal("refresh"));
final CompletionStage<QueueOfferResult> offer3 = queue.offer(new RefreshSignal("refresh"));
final CompletionStage<QueueOfferResult> offer4 = queue.offer(new RefreshSignal("refresh"));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment