Skip to content

Instantly share code, notes, and snippets.

@nambrot
Created December 2, 2017 23:36
Show Gist options
  • Save nambrot/c450562ae69e2545281d1658a337d44d to your computer and use it in GitHub Desktop.
Save nambrot/c450562ae69e2545281d1658a337d44d to your computer and use it in GitHub Desktop.
case class StateRecord(hrEvent: HeartRateEvent, bpEvent: BloodPressureEvent, time: Long)
val countStore = Stores.create("Limiter")
.withKeys(integerSerde)
.withValues(new JsonSerde[StateRecord])
.persistent()
.build();
builder.addStateStore(countStore)
class Limiter extends Transformer[Integer, (HeartRateEvent, BloodPressureEvent), KeyValue[Integer, (HeartRateEvent, BloodPressureEvent)]] {
var context: ProcessorContext = null;
var store: KeyValueStore[Integer, StateRecord] = null;
override def init(context: ProcessorContext) = {
this.context = context
this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]]
}
override def transform(key: Integer, value: (HeartRateEvent, BloodPressureEvent)) = {
val current = System.currentTimeMillis()
val newRecord = StateRecord(value._1, value._2, current)
store.get(key) match {
case StateRecord(_, _, time) if time + 15.seconds.toMillis < current => {
store.put(key, newRecord)
(key, value)
}
case StateRecord(_, _, _) => null
case null => {
store.put(key, newRecord)
(key, value)
}
}
}
override def punctuate(timestamp: Long) = null
override def close() = {}
}
filteredEvents
.transform(() => new Limiter(), "Limiter")
.map((userId: Integer, pair: (HeartRateEvent, BloodPressureEvent)) =>
new KeyValue(userId, s"User $userId has a problem")
).print
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment