This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SlidingWindows window = SlidingWindows.of(Duration.standardMinutes(60)).every(Duration.standardMinutes(5)); | |
// create the sources and map them into POJOs (and window them by the sliding window) | |
PCollection<KV<Integer, BloodPressureEvent>> bpEvents = | |
p.apply(Create.of(bloodPressureRawEvents)) | |
.apply(ParDo.of(new BloodPressureEventCaster())) | |
.apply(Window.into(window)) | |
.apply(MapElements.via((BloodPressureEvent event) -> KV.of(event.userId, event)).withOutputType(new TypeDescriptor<KV<Integer, BloodPressureEvent>>() {})); | |
PCollection<KV<Integer, HeartRateEvent>> hrEvents = | |
p.apply(Create.of(heartRateRawEvents)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Create the pipeline | |
PipelineOptions options = PipelineOptionsFactory.create(); | |
Pipeline p = Pipeline.create(options); | |
// set up the events | |
ArrayList<String> bloodPressureRawEvents = new ArrayList<>(); | |
bloodPressureRawEvents.add("{\"user_id\":12345,\"systolic\":90,\"diastolic\":80,\"timestamp\":\"2017-04-05T16:18:52-04:00\"}"); | |
//... | |
ArrayList<String> heartRateRawEvents = new ArrayList<>(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class StateRecord(hrEvent: HeartRateEvent, bpEvent: BloodPressureEvent, time: Long) | |
val countStore = Stores.create("Limiter") | |
.withKeys(integerSerde) | |
.withValues(new JsonSerde[StateRecord]) | |
.persistent() | |
.build(); | |
builder.addStateStore(countStore) | |
class Limiter extends Transformer[Integer, (HeartRateEvent, BloodPressureEvent), KeyValue[Integer, (HeartRateEvent, BloodPressureEvent)]] { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val filteredEvents: KStream[Integer, (HeartRateEvent, BloodPressureEvent)] = | |
combinedEvents | |
.filter((userId: Integer, tuple: (Option[HeartRateEvent], Option[BloodPressureEvent])) => { | |
(userId, tuple) match { | |
case (_, (Some(hrEvent: HeartRateEvent), Some(bpEvent: BloodPressureEvent))) => | |
hrEvent.heartRate > 100 && bpEvent.systolic < 100 | |
case _ => false | |
} | |
}).mapValues { | |
case (Some(hrEvent: HeartRateEvent), Some(bpEvent: BloodPressureEvent)) => (hrEvent, bpEvent) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val combinedEvents: KStream[Integer, (Option[HeartRateEvent], Option[BloodPressureEvent])] = | |
heartRateEvents | |
.outerJoin( | |
bloodPressureEvents, | |
(hrEvent, bpEvent: Option[BloodPressureEvent]) => (hrEvent, bpEvent), | |
JoinWindows.of(60.minutes.toMillis), | |
integerSerde, | |
new JsonSerde[Option[HeartRateEvent]], | |
new JsonSerde[Option[BloodPressureEvent]] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val bloodPressureStream: KStream[String, String] = builder.stream("rawBloodPressureEvents") | |
val heartRateStream: KStream[String, String] = builder.stream("rawHeartRateEvents") | |
val bloodPressureEvents: KStream[Integer, Option[BloodPressureEvent]] = | |
bloodPressureStream | |
.mapValues(toEvent[BloodPressureEvent]) | |
.map((_, v: BloodPressureEvent) => (v.userId, Some(v))) | |
val heartRateEvents: KStream[Integer, Option[HeartRateEvent]] = | |
heartRateStream |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val rateLimiter = | |
Flow[(Integer, String)] | |
.statefulMapConcat { () => | |
val hashMap = new mutable.HashMap[Integer, (Long, String)]() | |
(el) => el match { | |
case (userId, alert) => { | |
val current = System.currentTimeMillis() | |
hashMap.get(userId) match { | |
case Some((oldTime, oldAlert)) => { | |
if (oldTime + 15.seconds.toMillis < current) { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val eventsByWindowAndUser = Flow[Event] | |
// windowing the stream by creating WindowCommands | |
.statefulMapConcat { () => | |
val generator = new CommandGenerator() | |
ev => generator.forEvent(ev, 10.seconds, 1.second) | |
} | |
// Group by the windows | |
.groupBy(164, _.w) | |
// "Finish" the each window group when we detect a close window | |
.takeWhile(!_.isInstanceOf[CloseWindow]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b: GraphDSL.Builder[NotUsed] => | |
import GraphDSL.Implicits._ | |
val merger = b.add(Merge[(List[BloodPressureEvent], List[HeartRateEvent])](2)) | |
bloodPressureSource.map(toEvent[BloodPressureEvent]).map(event => (List(event), List())) ~> merger.in(0) | |
heartRateSource.map(toEvent[HeartRateEvent]).map(event => (List(), List(event))) ~> merger.in(1) | |
merger ~> Sink.foreach(println) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
implicit val system = ActorSystem("QuickStart") | |
implicit val materializer = ActorMaterializer() | |
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = { | |
val p = Promise[M] | |
val s = src.mapMaterializedValue { m => | |
p.trySuccess(m) | |
m | |
} | |
(s, p.future) |