Last active
June 23, 2021 20:25
-
-
Save rocketraman/543f066813fc89590f23ff5dacf43f01 to your computer and use it in GitHub Desktop.
Beam calendar day windows with context elements
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.xacoach.xascore.backend.analytics.windowfns | |
import org.apache.beam.sdk.coders.Coder | |
import org.apache.beam.sdk.transforms.* | |
import org.apache.beam.sdk.transforms.windowing.* | |
import org.apache.beam.sdk.values.KV | |
import org.apache.beam.sdk.values.PCollection | |
import org.apache.beam.sdk.values.TypeDescriptor | |
import org.apache.beam.sdk.values.TypeDescriptors | |
import org.joda.time.DateTimeZone | |
import java.time.Instant | |
import java.time.ZoneId | |
import java.time.ZoneOffset | |
import kotlin.time.Duration | |
import kotlin.time.days | |
import org.joda.time.Duration as JodaTimeDuration | |
import org.joda.time.Instant as JodaInstant | |
/** | |
* A calendar day window that provides context of a certain number of days prior. Since the only way to | |
* trigger based on completeness of data in Beam is the AfterWatermark.pastEndOfWindow() trigger, we window | |
* based on the context period, and then add the "last day"'s elements as late inputs into those context | |
* windows. | |
* | |
* In that way, the window will trigger only once all the context is available. | |
*/ | |
class ContextualCalendarDayWindow<T> private constructor( | |
private val contextDays: Int, | |
private val timeZone: ZoneId = ZoneOffset.UTC | |
): NonMergingWindowFn<T, IntervalWindow>() { | |
companion object { | |
/** | |
* This window requires specific setup of the associated trigger function, so use an invoker to create the | |
* window function, and configure it. | |
*/ | |
operator fun <T> invoke(days: Int, allowedLateness: Duration, timeZone: ZoneId = ZoneOffset.UTC): Window<T> { | |
return Window.into<T>(ContextualCalendarDayWindow(days - 1, timeZone)) | |
.triggering( | |
// no early trigger here, we only want to trigger on "late" elements which indicate our real elements | |
AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(1)) | |
) | |
.withAllowedLateness((allowedLateness).asJoda(), Window.ClosingBehavior.FIRE_IF_NON_EMPTY) | |
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY) | |
// LATEST is generally inefficient, but we need it here to make the watermark move in such a way that the | |
// downstream windowing in DailyWindowsWithContext produces output in the correct panes -- its not completely | |
// clear why that is the case because downstream of this in `ContextWindowsWithTimestamps` we skew the | |
// timestamps forward anyway, however despite that if we don't use LATEST here, the panes are incorrect | |
.withTimestampCombiner(TimestampCombiner.LATEST) | |
.accumulatingFiredPanes() | |
} | |
} | |
init { | |
require(contextDays > 0) { "The number of days must be 0 (for the current day only) or positive." } | |
} | |
override fun assignWindows(c: AssignContext): Collection<IntervalWindow> = | |
assignWindows(c.timestamp().asJava()) | |
override fun isCompatible(other: WindowFn<*, *>): Boolean = equals(other) | |
override fun windowCoder(): Coder<IntervalWindow> = IntervalWindow.getCoder() | |
override fun getDefaultWindowMappingFn(): WindowMappingFn<IntervalWindow> { | |
return object : WindowMappingFn<IntervalWindow>() { | |
override fun getSideInputWindow(mainWindow: BoundedWindow): IntervalWindow { | |
require(mainWindow !is GlobalWindow) { "Attempted to get side input window for GlobalWindow from non-global WindowFn" } | |
return assignWindows(mainWindow.maxTimestamp().asJava()).last() | |
} | |
} | |
} | |
override fun equals(other: Any?): Boolean { | |
val otherSlidingCalendarWindows = other as? ContextualCalendarDayWindow<*> ?: return false | |
return otherSlidingCalendarWindows.contextDays == contextDays && otherSlidingCalendarWindows.timeZone == timeZone | |
} | |
override fun hashCode(): Int { | |
var result = contextDays | |
result = 31 * result + timeZone.hashCode() | |
return result | |
} | |
private fun assignWindows(timestamp: Instant): Collection<IntervalWindow> { | |
return buildList { | |
val localDate = timestamp.atZone(timeZone).toLocalDate() | |
(0L..contextDays).forEach { | |
val intervalBegin = localDate.minusDays(contextDays - it) | |
// windows are sized by 1 less day than usual | |
val intervalEnd = intervalBegin.plusDays(contextDays.toLong()) | |
add(IntervalWindow( | |
intervalBegin.atStartOfDay(timeZone).toInstant().asJoda(), | |
intervalEnd.atStartOfDay(timeZone).toInstant().asJoda() | |
)) | |
} | |
} | |
} | |
} | |
/** | |
* Transforms inputs into daily windowed data, with a given number of days of contextual data attached. The input | |
* to this transform is a PCollection of elements of type V, which are then windowed via [ContextualCalendarDayWindow], | |
* and then keyed and grouped, and then windowed again into daily windows that contain that days elements, plus the | |
* context elements. | |
* | |
* The key and value type descriptors are needed to avoid errors due to type reification. The outputCoder may also | |
* be needed to avoid coder lookup errors for the output type, but is not required. | |
*/ | |
class DailyWindowsWithContext<K, V>( | |
private val days: Int, | |
private val allowedLateness: Duration, | |
private val inZone: ZoneId = ZoneOffset.UTC, | |
private val keyFn: SerializableFunction<V, K>, | |
private val timestampFn: (V) -> Instant, | |
private val keyTypeDescriptor: TypeDescriptor<K>, | |
private val valueTypeDescriptor: TypeDescriptor<V>, | |
private val outputCoder: Coder<KV<K, Iterable<@JvmWildcard V>>>? = null, | |
) : PTransform<PCollection<V>, PCollection<KV<K, Iterable<@JvmWildcard V>>>>() { | |
override fun expand(input: PCollection<V>): PCollection<KV<K, Iterable<@JvmWildcard V>>> { | |
val outputTypeDescriptor = TypeDescriptors.kvs(keyTypeDescriptor, TypeDescriptors.iterables(valueTypeDescriptor)) | |
return input | |
.apply("ContextWindows", ContextualCalendarDayWindow(days, allowedLateness, inZone)) | |
.apply("ContextWindowsKeys", WithKeys.of(keyFn).withKeyType(keyTypeDescriptor)) | |
.apply("ContextWindowsGroupBy", GroupByKey.create()) | |
.apply("ContextWindowsFilterFn", ParDo.of(ContextWindowsFilterFn(timestampFn))) | |
// move the timestamp forward so elements end up in the "right" daily windows, with context elements attached | |
.apply("ContextWindowsWithTimestamps", ParDo.of(ContextWindowsWithTimestampFn(inZone, timestampFn))) | |
.apply("DailyWindowsWithContext", | |
Window.into<KV<K, Iterable<@JvmWildcard V>>>(CalendarWindows.days(1).withTimeZone(inZone.asJoda())) | |
.triggering( | |
AfterWatermark.pastEndOfWindow() | |
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) | |
.withLateFirings(AfterPane.elementCountAtLeast(1)) | |
) | |
.withAllowedLateness(allowedLateness.asJoda(), Window.ClosingBehavior.FIRE_IF_NON_EMPTY) | |
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY) | |
.discardingFiredPanes() | |
) | |
.apply("DailyWindowsWithContextGroupBy", GroupByKey.create()) | |
// due to window + GBK twice, now we have an Iterable of Iterables, unwrap it | |
.apply("DailyWindowsWithContextFlatten", | |
MapElements.into(outputTypeDescriptor).via(ProcessFunction { | |
KV.of(it.key, it.value.flatten()) | |
}) | |
) | |
.apply { | |
if (outputCoder != null) { | |
coder = outputCoder | |
} | |
} | |
} | |
} | |
/** | |
* Meant to be used after a [ContextualCalendarDayWindow] to filter out any outputs that contain only context, | |
* without any non-context elements. These are not useful for downstream processing. However, we pass through | |
* LATE additions as-is, as it is not possible to know whether these are useful downstream or not. | |
*/ | |
private class ContextWindowsFilterFn<K, V>(val timestampOf: (V) -> Instant): DoFn<KV<K, Iterable<@JvmWildcard V>>, KV<K, Iterable<@JvmWildcard V>>>() { | |
@ProcessElement | |
fun process( | |
window: IntervalWindow, | |
paneInfo: PaneInfo, | |
@Element element: KV<K, Iterable<@JvmWildcard V>>, | |
receiver: OutputReceiver<KV<K, Iterable<@JvmWildcard V>>> | |
) { | |
if(paneInfo.timing == PaneInfo.Timing.LATE || element.value.any { timestampOf(it) >= window.end().asJava() }) { | |
receiver.output(element) | |
} | |
} | |
} | |
/** | |
* Meant to be used after a [ContextualCalendarDayWindow] to move the timestamp of elements forward into their | |
* "correct" window. | |
*/ | |
private class ContextWindowsWithTimestampFn<K, V>( | |
val zoneId: ZoneId, | |
val timestampOf: (V) -> Instant | |
): DoFn<KV<K, Iterable<@JvmWildcard V>>, KV<K, Iterable<@JvmWildcard V>>>() { | |
@ProcessElement | |
fun process( | |
@Element element: KV<K, Iterable<@JvmWildcard V>>, | |
window: BoundedWindow, | |
receiver: OutputReceiver<KV<K, Iterable<@JvmWildcard V>>> | |
) { | |
val lastWindowDay = window.asClosedRange().lastDay(zoneId) | |
val maxTimestamp = element.value.maxOf { timestampOf(it) } | |
if (maxTimestamp in lastWindowDay.plusDays(1).asRange(zoneId).toInstantRange()) { | |
receiver.outputWithTimestamp(element, maxTimestamp.asJoda() ) | |
} | |
} | |
} | |
fun Duration.asJoda(): JodaTimeDuration = JodaTimeDuration.millis(toLongMilliseconds()) | |
fun Instant.asJoda(): JodaInstant = JodaInstant.ofEpochMilli(toEpochMilli()) | |
fun JodaInstant.asJava(): Instant = Instant.ofEpochMilli(millis) | |
fun ZoneId.asJoda(): DateTimeZone = DateTimeZone.forID(if(id == "Z") "UTC" else id) | |
fun BoundedWindow.asClosedRange(): ClosedRange<Instant> { | |
return when(this) { | |
is IntervalWindow -> start().asJava().rangeTo(end().asJava()) | |
else -> error("Window type ${javaClass::getCanonicalName} not supported") | |
} | |
} | |
fun ClosedRange<Instant>.lastDay(inZone: ZoneId): LocalDate = | |
endInclusive.minusSeconds(1).atZone(inZone).toLocalDate() | |
fun ClosedRange<ZonedDateTime>.toInstantRange(): ClosedRange<Instant> = | |
start.toInstant()..endInclusive.toInstant() | |
fun LocalDate.atStartOfNextDay(zoneId: ZoneId): ZonedDateTime = atStartOfDay(zoneId).plusDays(1) | |
fun LocalDate.asRange(zoneId: ZoneId): ClosedRange<ZonedDateTime> = | |
atStartOfDay(zoneId)..atStartOfNextDay(zoneId) | |
inline fun <reified T: Any> typeDescriptor(): TypeDescriptor<T> = TypeDescriptor.of(T::class.java) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment