Created
August 1, 2021 22:03
-
-
Save x/a8399b32900bcaf3b700bbb196b956e3 to your computer and use it in GitHub Desktop.
Apache Beam Summit - Windowing Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.oden.laser.common.transforms; | |
import org.apache.beam.sdk.transforms.windowing.*; | |
import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior; | |
import org.joda.time.Duration; | |
public class Windows { | |
/* | |
* The Window described attempts to both be prompt but not needlessly retrigger. | |
* It's designed to account for the following cases... | |
* <ul> | |
* <li> All data is coming on-time. The watermark at any given time is roughly the | |
* current time. | |
* <li> Data is being backfilled from some subset of metrics and the watermark is | |
* ahead of the event time of the windows for those metrics. | |
* <li> Data is being backfilled for some subset of metrics but the watermark has | |
* been stuck to be earlier than than event time for most metrics. | |
* <li> Any of cases 1, 2, or 3 but where late data has arrived due to some | |
* uncontrollable situation (i.e. a single metric for a pane gets stuck in | |
* pubsub for days and then is released). | |
* </ul> | |
* | |
* Additional Reading: - | |
* https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102 - | |
* https://beam.apache.org/documentation/programming-guide/#windowing - | |
* https://issues.apache.org/jira/browse/BEAM-644 | |
*/ | |
public static <T> Window<T> earlyAndLateFireSlidingWindow( | |
Duration windowSize, | |
Duration windowSlide, | |
Duration earlyFire, | |
Duration allowedLateness, | |
Duration offset) { | |
return Window.<T>into( | |
SlidingWindows.of(windowSize) | |
.every(windowSlide) | |
// In sliding windows, with a configurable window size plus a | |
// buffer(default at 0) on the end to provide space for | |
// calculating the last deltasum value(rollups). We add a offset | |
// (default at 0),which moves the window forward | |
// [start+offset, end+offset) to align with Heroic's | |
// exclusive start and inclusive end. | |
// .withOffset(windowSize.minus(deltasumBuffer).plus(offset))) | |
.withOffset(offset)) | |
// This sliding window will fire (materialize the accumulated | |
// data) at least once. Each time we do we'll fire with the | |
// accumulated data in the window so far (as opposed to just the | |
// new data since the last fire). | |
.accumulatingFiredPanes() | |
.triggering( | |
// The primary way that this window will fire is when the | |
// watermark (tracked upstream as the estimated minimum of the | |
// backlog) exceeds the end of the window. This is the only | |
// firing behavior for case 1 and the first firing behavior | |
// for cases 2 and 4. | |
AfterWatermark.pastEndOfWindow() | |
// In case 3, we don't want the user to have to wait until | |
// the watermark has caught up to get their data so we | |
// have a configurable threshold that will allow the | |
// window to fire early based on how much time has passed | |
// since the first element we saw in the pane. | |
.withEarlyFirings( | |
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFire)) | |
// In case 2, all elements are considered "late". And we | |
// don't want to excessively fire once for every element | |
// that gets added to the pane (i.e. 300 times for a 5 | |
// minute window). So, instead, we only late fire when new | |
// elements enter and the window's time has passed in | |
// process time. The assumption here is that backfilling a | |
// pane is, typically, faster than on-time filling. This | |
// introduces a small, but acceptable, lag in case 4. | |
.withLateFirings( | |
AfterAll.of( | |
AfterPane.elementCountAtLeast(1), | |
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowSize)))) | |
// When accounting for case 3, after the watermarket has caught | |
// up, the default behavior would be to fire the window again. | |
// This changes that behavior to only fire if any new data has | |
// arrived between the early fire and the on-time fire. | |
.withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY) | |
// This sets the duration we will retain the panes and accept late | |
// data in event time. | |
.withAllowedLateness(allowedLateness); | |
} | |
public static <T> Window<T> earlyAndLateFireSlidingWindow( | |
Duration windowSize, Duration windowSlide, Duration earlyFire, Duration allowedLateness) { | |
return earlyAndLateFireSlidingWindow( | |
windowSize, windowSlide, earlyFire, allowedLateness, Duration.ZERO); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment