Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Created November 9, 2016 08:19
Show Gist options
  • Save Horusiath/d34f92f6a9fe9f5ac3fa687be489245f to your computer and use it in GitHub Desktop.
Save Horusiath/d34f92f6a9fe9f5ac3fa687be489245f to your computer and use it in GitHub Desktop.
Merging two event streams from two persistent actors
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Persistence.Query;
using Akka.Persistence.Query.Sql;
using Akka.Streams;
using Akka.Streams.Dsl;
namespace StreamsPlayground
{
class Program
{
static void Main(string[] args)
{
var result = Task.Run(MergeStreams).Result;
}
private static async Task<ImmutableList<object>> MergeStreams()
{
using (var system = ActorSystem.Create("system"))
using (var materializer = system.Materializer())
{
var journal = system.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
// construct graph that will merge event sources from both persistent actors,
// make make some operation on them and push results to sink
var graph =
RunnableGraph.FromGraph(
GraphDsl.CreateMaterialized((GraphDsl.Builder<Task<ImmutableList<object>>> builder) =>
{
// get stream of events for both persistent actors
var eventSourceA =
builder.Add(journal.CurrentEventsByPersistenceId("persistence-id-1", 0, long.MaxValue));
var eventSourceB =
builder.Add(journal.CurrentEventsByPersistenceId("persistence-id-2", 0, long.MaxValue));
var merge = builder.Add(new Merge<EventEnvelope>(2));
// construct a flow that will do something on merged stream
var flow = Flow.Create<EventEnvelope>().Select(e => e.Event);
// create a sink, when all results will be pushed
var sink = Sink.Aggregate<object, ImmutableList<object>>(ImmutableList<object>.Empty,
(list, e) => list.Add(e));
// combine everything - this will create a graph that looks like this:
// +--------------+
// | eventSourceA |--+
// +--------------+ | +-------+ +--------+ +----------------+
// +-->| Merge |-->| Select |-->| Aggregate sink |
// +--------------+ | +-------+ +--------+ +----------------+
// | eventSourceB |--+
// +--------------+
builder.From(eventSourceA).Via(merge).Via(flow).To(sink);
builder.From(eventSourceB).To(merge);
return ClosedShape.Instance;
}));
// run graph and await for the result
return await graph.Run(materializer);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment