Skip to content

Instantly share code, notes, and snippets.

@Jayasagar
Last active July 7, 2017 04:28
Show Gist options
  • Save Jayasagar/3a23504c7f94d8a30e02c7f82d43e735 to your computer and use it in GitHub Desktop.
Save Jayasagar/3a23504c7f94d8a30e02c7f82d43e735 to your computer and use it in GitHub Desktop.
Build the Partition Graph
// Build the Partition Graph
Graph<ClosedShape, Consumer.Control> completionStageGraph = GraphDSL.create(sourceStream, (builder, sourceShape) -> {
UniformFanOutShape<EventMessage, EventMessage> eventTypeFanOut = builder.add(eventTypePartition);
Outlet<EventMessage> unknownEventOutlet = eventTypeFanOut.out(0);
Outlet<EventMessage> firstPartitionOutlet = eventTypeFanOut.out(1);
Outlet<EventMessage> secondPartitionOutlet = eventTypeFanOut.out(2);
builder.from(sourceShape)
.toFanOut(eventTypeFanOut)
.from(unknownEventOutlet)
.to(builder.add(commitOnlySink))
.from(firstPartitionOutlet)
.via(builder.add(firstPartitionFlow))
.via(builder.add(produceMessageFlow))
.to(builder.add((producerCommitSink)))
.from(secondPartitionOutlet)
.via(builder.add(secondPartitionFlow))
.to(builder.add(commitOnlySink));
return ClosedShape.getInstance();
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment