Created
November 9, 2016 08:19
-
-
Save Horusiath/d34f92f6a9fe9f5ac3fa687be489245f to your computer and use it in GitHub Desktop.
Merging two event streams from two persistent actors
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Collections.Immutable; | |
using System.Threading.Tasks; | |
using Akka.Actor; | |
using Akka.Persistence.Query; | |
using Akka.Persistence.Query.Sql; | |
using Akka.Streams; | |
using Akka.Streams.Dsl; | |
namespace StreamsPlayground | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var result = Task.Run(MergeStreams).Result; | |
} | |
private static async Task<ImmutableList<object>> MergeStreams() | |
{ | |
using (var system = ActorSystem.Create("system")) | |
using (var materializer = system.Materializer()) | |
{ | |
var journal = system.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier); | |
// construct graph that will merge event sources from both persistent actors, | |
// make make some operation on them and push results to sink | |
var graph = | |
RunnableGraph.FromGraph( | |
GraphDsl.CreateMaterialized((GraphDsl.Builder<Task<ImmutableList<object>>> builder) => | |
{ | |
// get stream of events for both persistent actors | |
var eventSourceA = | |
builder.Add(journal.CurrentEventsByPersistenceId("persistence-id-1", 0, long.MaxValue)); | |
var eventSourceB = | |
builder.Add(journal.CurrentEventsByPersistenceId("persistence-id-2", 0, long.MaxValue)); | |
var merge = builder.Add(new Merge<EventEnvelope>(2)); | |
// construct a flow that will do something on merged stream | |
var flow = Flow.Create<EventEnvelope>().Select(e => e.Event); | |
// create a sink, when all results will be pushed | |
var sink = Sink.Aggregate<object, ImmutableList<object>>(ImmutableList<object>.Empty, | |
(list, e) => list.Add(e)); | |
// combine everything - this will create a graph that looks like this: | |
// +--------------+ | |
// | eventSourceA |--+ | |
// +--------------+ | +-------+ +--------+ +----------------+ | |
// +-->| Merge |-->| Select |-->| Aggregate sink | | |
// +--------------+ | +-------+ +--------+ +----------------+ | |
// | eventSourceB |--+ | |
// +--------------+ | |
builder.From(eventSourceA).Via(merge).Via(flow).To(sink); | |
builder.From(eventSourceB).To(merge); | |
return ClosedShape.Instance; | |
})); | |
// run graph and await for the result | |
return await graph.Run(materializer); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment