Skip to content

Instantly share code, notes, and snippets.

View Jayasagar's full-sized avatar

Jayasagar Jagirapu Jayasagar

View GitHub Profile
@Jayasagar
Jayasagar / CreatePartition.java
Last active July 7, 2017 04:44
Akka Stream Create Partition
Graph<UniformFanOutShape<EventMessage, EventMessage>, NotUsed> eventTypePartition = Partition.create(3,
(Function<EventMessage, Object>) tuple -> {
switch (tuple.message) {
case "EVENT_TYPE_1":
return 1;
case "EVENT_TYPE_2":
return 2;
default:
return 0;
}
@Jayasagar
Jayasagar / buildGraph.java
Last active July 7, 2017 04:28
Build the Partition Graph
// Build the Partition Graph
Graph<ClosedShape, Consumer.Control> completionStageGraph = GraphDSL.create(sourceStream, (builder, sourceShape) -> {
UniformFanOutShape<EventMessage, EventMessage> eventTypeFanOut = builder.add(eventTypePartition);
Outlet<EventMessage> unknownEventOutlet = eventTypeFanOut.out(0);
Outlet<EventMessage> firstPartitionOutlet = eventTypeFanOut.out(1);
Outlet<EventMessage> secondPartitionOutlet = eventTypeFanOut.out(2);
builder.from(sourceShape)
.toFanOut(eventTypeFanOut)
@Jayasagar
Jayasagar / Rungraph.java
Created July 7, 2017 04:29
Run the graph
//#errorHandlingClosedRunnableGraph
// Self kill this actor so that BackoffSupervisor starts this actor(pipeline/stream) again!
RunnableGraph.fromGraph(completionStageGraph)
.run(materializer)
.isShutdown()
.whenComplete((done, throwable) -> {
getSelf().tell(PoisonPill.getInstance(), getSelf());
});
@Jayasagar
Jayasagar / RunnableGraphStreamWrapperActor.java
Created July 7, 2017 04:30
Complete example reference to Partition Graph and Reactive Kafka
package sample.javadsl;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.function.Function;
@Jayasagar
Jayasagar / JavaStreamModal.java
Created December 17, 2017 15:03
Data model used for the examples below
@Getter @Setter @Builder @ToString(of = {"name", "city"})
public class Consumer {
private String name;
private String city;
private int age;
private List<Thing> things;
}
@Setter @Getter @Builder @ToString(of = {"name"})
public class Thing {// Device in IoT
@Jayasagar
Jayasagar / JavaStreamTestData.java
Created December 17, 2017 15:08
Java Stream Test Data
List<Event> bulbEvents = Arrays.asList(Event.builder().name("switchedOn").isActive(true).build(), Event.builder().name("switchedOff").isActive(true).build());
List<Event> motionDetectorEvents = Arrays.asList(Event.builder().name("motionCaptured").isActive(true).build());
List<Event> eyeVirusEvents = Arrays.asList(Event.builder().name("virusFound").isActive(true).build());
List<Event> waterLevelsEvents = Arrays.asList(Event.builder().name("dryField").isActive(true).build());
Thing bulb = Thing.builder().name("Bulb").cost(120).isRunning(true).type(Type.HOME).events(bulbEvents).build();
Thing motionDetector = Thing.builder().name("MotionDetector").cost(60).isRunning(true).type(Type.HOME).events(motionDetectorEvents).build();
Thing eyeVirusDetector = Thing.builder().name("EyeVirusDetector").cost(200).isRunning(true).type(Type.HEALTH).events(eyeVirusEvents).build();
Thing agriFieldSensor = Thing.builder().name("AgriFieldSensor").cost(300).isRunning(true).type(Type.FIELD).events(eyeVirusEvents).build();
Consumer ma
List<Consumer> consumersByAge =
consumerList
.stream()
// filter is an intermediate operation
.filter(consumer -> consumer.getAge() > 30)
// collect is a terminal operation
.collect(Collectors.toList());
System.out.println(consumersByAge);
// Output: [Consumer(name=Sri, city=Hyd), Consumer(name=Bob, city=Berlin)]
Map<String, List<Thing>> consumerDevicesByAge =
consumerList
.stream()
// filter is an intermediate operation
.filter(consumer -> consumer.getAge() > 30)
// collect is a terminal operation
.collect(Collectors.toMap(Consumer::getName, Consumer::getThings));
// In the above code, used 'Method reference(shorthand form of lamda)' Consumer::getName which is equalent to consumer -> consumer.getName()
System.out.println(consumerDevicesByAge);
// User Bob devices sort by name
List<Thing> bobDevicesSortByName = bob.getThings()
.stream()
// sorted is an intermediate operation
.sorted(Comparator.comparing(Thing::getName))
.collect(Collectors.toList());
System.out.println(bobDevicesSortByName);
// Output: [Thing(name=AgriFieldSensor), Thing(name=Bulb), Thing(name=EyeVirusDetector), Thing(name=MotionDetector)]
List<String> uniqueCities = consumerList
.stream()
.map(Consumer::getCity)
.distinct()
.collect(Collectors.toList());
System.out.println(uniqueCities);
// Output: [Hyd, Berlin]