Last active
January 5, 2017 11:30
-
-
Save rofr/d901fbdfd123f7fbd359 to your computer and use it in GitHub Desktop.
Spiking an async origodb engine using akka.net and EventStore v3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Executor : ReceiveActor | |
{ | |
readonly Kernel _kernel; | |
public Executor(Kernel kernel) | |
{ | |
_kernel = kernel; | |
Receive<Tuple<Command,ActorRef>[]>(ExecuteCommands); | |
} | |
private bool ExecuteCommands(Tuple<Command,ActorRef>[] tuples) | |
{ | |
foreach (var tuple in tuples) | |
{ | |
var result = _kernel.Execute(tuple.Item1); | |
//send a return message to the external caller | |
// will correlate with the call to Ask<>() in Prevayler.ExecuteAsync() | |
tuple.Item2.Tell(result, Context.Parent); | |
} | |
return true; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Append multiple commands accumulated during a specific time period or up | |
/// to a specific limit. | |
/// </summary> | |
public class JournalWriter : ReceiveActor | |
{ | |
private readonly IEventStoreConnection _eventStore; | |
private readonly IFormatter _formatter; | |
//number of commands at a time to journal | |
public int BatchSize = 100; | |
//or after a specific time elapsed, whichever comes first | |
public TimeSpan Interval; | |
//buffered commands waiting to be written to the journal | |
readonly List<Tuple<Command,ActorRef>> _commandBuffer = new List<Tuple<Command,ActorRef>>(); | |
//pass on the journaled commands to this actor | |
readonly ActorRef _executor; | |
public JournalWriter(ActorRef executor, int batchSize) | |
{ | |
BatchSize = batchSize; | |
_executor = executor; | |
Receive<Tuple<Command, ActorRef>>(Accept); | |
SetReceiveTimeout(Interval); | |
Receive<ReceiveTimeout>(HandleTimeout); | |
_eventStore = EventStoreConnection.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113)); | |
_eventStore.ConnectAsync().Wait(); | |
_formatter = new BinaryFormatter(); | |
} | |
public bool HandleTimeout(ReceiveTimeout _) | |
{ | |
Go(); | |
return true; | |
} | |
private void Go() | |
{ | |
if (_commandBuffer.Count > 0) | |
{ | |
//Console.WriteLine("JOURNALER: Writing {0} commands", _commandBuffer.Count); | |
_eventStore.AppendToStreamAsync("akka", ExpectedVersion.Any, | |
_commandBuffer.Select(ToEventData).ToArray()).Wait(); | |
//pass on for execution | |
_executor.Tell(_commandBuffer.ToArray()); | |
_commandBuffer.Clear(); | |
} | |
} | |
byte[] _bytes = new byte[200]; | |
private EventData ToEventData(Tuple<Command, ActorRef> arg) | |
{ | |
var id = Guid.NewGuid(); | |
//var stream = new MemoryStream(); | |
//_formatter.Serialize(stream, arg.Item1); | |
return new EventData(id, "akka", false, _bytes, null); | |
} | |
public bool Accept(Tuple<Command, ActorRef> command) | |
{ | |
_commandBuffer.Add(command); | |
if (_commandBuffer.Count == BatchSize) Go(); | |
return true; | |
} | |
protected override void PostStop() | |
{ | |
base.PostStop(); | |
_eventStore.Close(); | |
Console.WriteLine("PostStop called"); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Test] | |
public void Smoke(int batchSize = 100) | |
{ | |
Console.WriteLine("Batch size: " + batchSize); | |
var sw = new Stopwatch(); | |
var prevayler = new Prevayler<List<string>>(new List<string>()); | |
sw.Start(); | |
var tasks = Enumerable | |
.Range(0, 10000) | |
.Select(i => prevayler.ExecuteAsync(new AddItemCommand(i.ToString()))).ToArray(); | |
Task.WaitAll(tasks); | |
sw.Stop(); | |
Console.WriteLine("async elapsed: " + sw.Elapsed); | |
prevayler.Dispose(); | |
} | |
[Test] | |
public void ProgressiveBatchSizes() | |
{ | |
foreach (var batchSize in Enumerable.Range(0,8).Select(i => 10 * Math.Pow(2, i))) | |
{ | |
Smoke((int)batchSize); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Prevalence engine | |
/// </summary> | |
/// <typeparam name="M"></typeparam> | |
public class Prevayler<M> : IDisposable | |
{ | |
readonly ActorSystem _actorSystem; | |
readonly ActorRef _dispatcher; | |
public Prevayler(M model) | |
{ | |
// the kernel is an origodb component which | |
// synchronizes reads and writes to the model | |
// will be shared by command executor and query executor group | |
var kernel = new Kernel(model); | |
//build the chain of actors backwards | |
_actorSystem = ActorSystem.Create("prevayler"); | |
//executor executes commands | |
//it could also handle queries but would allow either a single query or command at time. | |
//better to add a group of actors that can execute queries concurrently | |
var executor = _actorSystem.ActorOf(Props.Create(() => new Executor(kernel))); | |
//journaler writes commands to the journal in batches or at specific intervals | |
//before passing to the executor | |
var journaler = _actorSystem.ActorOf(Props.Create(() => new JournalWriter(executor))); | |
//dispatcher prepares initial message and passes to journaler | |
_dispatcher = _actorSystem.ActorOf(Props.Create(() => new Dispatcher(journaler))); | |
} | |
public async Task<R> ExecuteAsync<R>(Command<M,R> command) | |
{ | |
return await _dispatcher.Ask<R>(command); | |
} | |
public async Task ExecuteAsync(Command<M> command) | |
{ | |
await _dispatcher.Ask(command); | |
} | |
public R Execute<R>(Command<M, R> command) | |
{ | |
return ExecuteAsync(command).Result; | |
} | |
public void Execute(Command<M> command) | |
{ | |
ExecuteAsync(command).Wait(); | |
} | |
public void Dispose() | |
{ | |
_actorSystem.Shutdown(); | |
_actorSystem.WaitForShutdown(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Batch size: 10 | |
async elapsed: 00:00:03.1852693 | |
Batch size: 20 | |
PostStop called | |
async elapsed: 00:00:01.0519764 | |
Batch size: 40 | |
PostStop called | |
async elapsed: 00:00:00.7820753 | |
Batch size: 80 | |
PostStop called | |
async elapsed: 00:00:00.8452527 | |
Batch size: 160 | |
PostStop called | |
async elapsed: 00:00:00.8752412 | |
Batch size: 320 | |
PostStop called | |
async elapsed: 00:00:00.8854354 | |
Batch size: 640 | |
PostStop called | |
[INFO][2014-10-15 11:30:53][Thread 0029][akka://prevayler/deadLetters] Message DeathWatchNotification from akka://prevayler/deadLetters to akka://prevayler/deadLetters was not delivered. 1 dead letters encountered. | |
async elapsed: 00:00:00.9683153 | |
Batch size: 1280 | |
PostStop called | |
async elapsed: 00:00:00.8868212 | |
PostStop called |
Is there anyway to get notifications of new comments here?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I would consider rethinking this bit here: