Created
August 31, 2017 00:42
-
-
Save eliaslevy/43eca44e92fdef44e6717c60ea46e4d2 to your computer and use it in GitHub Desktop.
EarlyFiringEventTimeTrigger
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.state.ValueState | |
import org.apache.flink.api.common.state.ValueStateDescriptor | |
import org.apache.flink.api.common.typeinfo.TypeHint | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext | |
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult | |
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult.{CONTINUE, FIRE, FIRE_AND_PURGE, PURGE} | |
import org.apache.flink.streaming.api.windowing.time.Time | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow | |
/** | |
* A Trigger that fires early at a regular interval without purging if there have | |
* been new events added to the pane, and once purging when the watermark passes | |
* the end of the window to which a pane belongs. | |
*/ | |
class EarlyFiringEventTimeTrigger(firingInterval: Long) extends Trigger[Object, TimeWindow] { | |
// in milliseconds | |
private val interval = firingInterval | |
private val firstStateDesc: ValueStateDescriptor[Boolean] = | |
new ValueStateDescriptor("first", new TypeHint[Boolean](){}.getTypeInfo(), true) | |
private val moreStateDesc: ValueStateDescriptor[Boolean] = | |
new ValueStateDescriptor("more", new TypeHint[Boolean](){}.getTypeInfo(), false) | |
def onElement(element: Object, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { | |
val first: ValueState[Boolean] = ctx.getPartitionedState(firstStateDesc) | |
val more: ValueState[Boolean] = ctx.getPartitionedState(moreStateDesc ) | |
if (first.value) { | |
val nextEarlyFire = (timestamp - (timestamp % interval)) + interval | |
val nextFire = math.min(nextEarlyFire, window.maxTimestamp) | |
ctx.registerEventTimeTimer(nextFire) | |
first.update(false) | |
} | |
if (!more.value) more.update(true) | |
CONTINUE | |
} | |
def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { | |
val more: ValueState[Boolean] = ctx.getPartitionedState(moreStateDesc) | |
if (time >= window.maxTimestamp) { | |
if (more.value) { | |
more.update(false) | |
FIRE_AND_PURGE | |
} else { | |
PURGE | |
} | |
} else { | |
ctx.registerEventTimeTimer(time + interval) | |
if (more.value) { | |
more.update(false) | |
FIRE | |
} else { | |
CONTINUE | |
} | |
} | |
} | |
def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { | |
CONTINUE | |
} | |
override def clear(window: TimeWindow, ctx: TriggerContext): Unit = { | |
ctx.getPartitionedState(firstStateDesc).clear() | |
ctx.getPartitionedState(moreStateDesc ).clear() | |
ctx.deleteEventTimeTimer(window.maxTimestamp) | |
} | |
override def toString = "EarlyFiringEventTimeTrigger()" | |
} | |
object EarlyFiringEventTimeTrigger { | |
def of(interval: Time) = new EarlyFiringEventTimeTrigger(interval.toMilliseconds) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment