This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Graph<UniformFanOutShape<EventMessage, EventMessage>, NotUsed> eventTypePartition = Partition.create(3, | |
(Function<EventMessage, Object>) tuple -> { | |
switch (tuple.message) { | |
case "EVENT_TYPE_1": | |
return 1; | |
case "EVENT_TYPE_2": | |
return 2; | |
default: | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Build the Partition Graph | |
Graph<ClosedShape, Consumer.Control> completionStageGraph = GraphDSL.create(sourceStream, (builder, sourceShape) -> { | |
UniformFanOutShape<EventMessage, EventMessage> eventTypeFanOut = builder.add(eventTypePartition); | |
Outlet<EventMessage> unknownEventOutlet = eventTypeFanOut.out(0); | |
Outlet<EventMessage> firstPartitionOutlet = eventTypeFanOut.out(1); | |
Outlet<EventMessage> secondPartitionOutlet = eventTypeFanOut.out(2); | |
builder.from(sourceShape) | |
.toFanOut(eventTypeFanOut) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//#errorHandlingClosedRunnableGraph | |
// Self kill this actor so that BackoffSupervisor starts this actor(pipeline/stream) again! | |
RunnableGraph.fromGraph(completionStageGraph) | |
.run(materializer) | |
.isShutdown() | |
.whenComplete((done, throwable) -> { | |
getSelf().tell(PoisonPill.getInstance(), getSelf()); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample.javadsl; | |
import akka.Done; | |
import akka.NotUsed; | |
import akka.actor.ActorRef; | |
import akka.actor.ActorSystem; | |
import akka.actor.PoisonPill; | |
import akka.actor.Props; | |
import akka.actor.UntypedActor; | |
import akka.japi.function.Function; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Getter @Setter @Builder @ToString(of = {"name", "city"}) | |
public class Consumer { | |
private String name; | |
private String city; | |
private int age; | |
private List<Thing> things; | |
} | |
@Setter @Getter @Builder @ToString(of = {"name"}) | |
public class Thing {// Device in IoT |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
List<Event> bulbEvents = Arrays.asList(Event.builder().name("switchedOn").isActive(true).build(), Event.builder().name("switchedOff").isActive(true).build()); | |
List<Event> motionDetectorEvents = Arrays.asList(Event.builder().name("motionCaptured").isActive(true).build()); | |
List<Event> eyeVirusEvents = Arrays.asList(Event.builder().name("virusFound").isActive(true).build()); | |
List<Event> waterLevelsEvents = Arrays.asList(Event.builder().name("dryField").isActive(true).build()); | |
Thing bulb = Thing.builder().name("Bulb").cost(120).isRunning(true).type(Type.HOME).events(bulbEvents).build(); | |
Thing motionDetector = Thing.builder().name("MotionDetector").cost(60).isRunning(true).type(Type.HOME).events(motionDetectorEvents).build(); | |
Thing eyeVirusDetector = Thing.builder().name("EyeVirusDetector").cost(200).isRunning(true).type(Type.HEALTH).events(eyeVirusEvents).build(); | |
Thing agriFieldSensor = Thing.builder().name("AgriFieldSensor").cost(300).isRunning(true).type(Type.FIELD).events(eyeVirusEvents).build(); | |
Consumer ma |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
List<Consumer> consumersByAge = | |
consumerList | |
.stream() | |
// filter is an intermediate operation | |
.filter(consumer -> consumer.getAge() > 30) | |
// collect is a terminal operation | |
.collect(Collectors.toList()); | |
System.out.println(consumersByAge); | |
// Output: [Consumer(name=Sri, city=Hyd), Consumer(name=Bob, city=Berlin)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Map<String, List<Thing>> consumerDevicesByAge = | |
consumerList | |
.stream() | |
// filter is an intermediate operation | |
.filter(consumer -> consumer.getAge() > 30) | |
// collect is a terminal operation | |
.collect(Collectors.toMap(Consumer::getName, Consumer::getThings)); | |
// In the above code, used 'Method reference(shorthand form of lamda)' Consumer::getName which is equalent to consumer -> consumer.getName() | |
System.out.println(consumerDevicesByAge); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// User Bob devices sort by name | |
List<Thing> bobDevicesSortByName = bob.getThings() | |
.stream() | |
// sorted is an intermediate operation | |
.sorted(Comparator.comparing(Thing::getName)) | |
.collect(Collectors.toList()); | |
System.out.println(bobDevicesSortByName); | |
// Output: [Thing(name=AgriFieldSensor), Thing(name=Bulb), Thing(name=EyeVirusDetector), Thing(name=MotionDetector)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
List<String> uniqueCities = consumerList | |
.stream() | |
.map(Consumer::getCity) | |
.distinct() | |
.collect(Collectors.toList()); | |
System.out.println(uniqueCities); | |
// Output: [Hyd, Berlin] |
OlderNewer