Skip to content

Instantly share code, notes, and snippets.

SlidingWindows window = SlidingWindows.of(Duration.standardMinutes(60)).every(Duration.standardMinutes(5));
// create the sources and map them into POJOs (and window them by the sliding window)
PCollection<KV<Integer, BloodPressureEvent>> bpEvents =
p.apply(Create.of(bloodPressureRawEvents))
.apply(ParDo.of(new BloodPressureEventCaster()))
.apply(Window.into(window))
.apply(MapElements.via((BloodPressureEvent event) -> KV.of(event.userId, event)).withOutputType(new TypeDescriptor<KV<Integer, BloodPressureEvent>>() {}));
PCollection<KV<Integer, HeartRateEvent>> hrEvents =
p.apply(Create.of(heartRateRawEvents))
// Create the pipeline
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
// set up the events
ArrayList<String> bloodPressureRawEvents = new ArrayList<>();
bloodPressureRawEvents.add("{\"user_id\":12345,\"systolic\":90,\"diastolic\":80,\"timestamp\":\"2017-04-05T16:18:52-04:00\"}");
//...
ArrayList<String> heartRateRawEvents = new ArrayList<>();
case class StateRecord(hrEvent: HeartRateEvent, bpEvent: BloodPressureEvent, time: Long)
val countStore = Stores.create("Limiter")
.withKeys(integerSerde)
.withValues(new JsonSerde[StateRecord])
.persistent()
.build();
builder.addStateStore(countStore)
class Limiter extends Transformer[Integer, (HeartRateEvent, BloodPressureEvent), KeyValue[Integer, (HeartRateEvent, BloodPressureEvent)]] {
val filteredEvents: KStream[Integer, (HeartRateEvent, BloodPressureEvent)] =
combinedEvents
.filter((userId: Integer, tuple: (Option[HeartRateEvent], Option[BloodPressureEvent])) => {
(userId, tuple) match {
case (_, (Some(hrEvent: HeartRateEvent), Some(bpEvent: BloodPressureEvent))) =>
hrEvent.heartRate > 100 && bpEvent.systolic < 100
case _ => false
}
}).mapValues {
case (Some(hrEvent: HeartRateEvent), Some(bpEvent: BloodPressureEvent)) => (hrEvent, bpEvent)
val combinedEvents: KStream[Integer, (Option[HeartRateEvent], Option[BloodPressureEvent])] =
heartRateEvents
.outerJoin(
bloodPressureEvents,
(hrEvent, bpEvent: Option[BloodPressureEvent]) => (hrEvent, bpEvent),
JoinWindows.of(60.minutes.toMillis),
integerSerde,
new JsonSerde[Option[HeartRateEvent]],
new JsonSerde[Option[BloodPressureEvent]]
val bloodPressureStream: KStream[String, String] = builder.stream("rawBloodPressureEvents")
val heartRateStream: KStream[String, String] = builder.stream("rawHeartRateEvents")
val bloodPressureEvents: KStream[Integer, Option[BloodPressureEvent]] =
bloodPressureStream
.mapValues(toEvent[BloodPressureEvent])
.map((_, v: BloodPressureEvent) => (v.userId, Some(v)))
val heartRateEvents: KStream[Integer, Option[HeartRateEvent]] =
heartRateStream
val rateLimiter =
Flow[(Integer, String)]
.statefulMapConcat { () =>
val hashMap = new mutable.HashMap[Integer, (Long, String)]()
(el) => el match {
case (userId, alert) => {
val current = System.currentTimeMillis()
hashMap.get(userId) match {
case Some((oldTime, oldAlert)) => {
if (oldTime + 15.seconds.toMillis < current) {
val eventsByWindowAndUser = Flow[Event]
// windowing the stream by creating WindowCommands
.statefulMapConcat { () =>
val generator = new CommandGenerator()
ev => generator.forEvent(ev, 10.seconds, 1.second)
}
// Group by the windows
.groupBy(164, _.w)
// "Finish" the each window group when we detect a close window
.takeWhile(!_.isInstanceOf[CloseWindow])
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val merger = b.add(Merge[(List[BloodPressureEvent], List[HeartRateEvent])](2))
bloodPressureSource.map(toEvent[BloodPressureEvent]).map(event => (List(event), List())) ~> merger.in(0)
heartRateSource.map(toEvent[HeartRateEvent]).map(event => (List(), List(event))) ~> merger.in(1)
merger ~> Sink.foreach(println)
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)