Skip to content

Instantly share code, notes, and snippets.

@rofr
Created October 20, 2014 16:37
Show Gist options
  • Save rofr/9d9e1cc76b7aef26d8f3 to your computer and use it in GitHub Desktop.
Save rofr/9d9e1cc76b7aef26d8f3 to your computer and use it in GitHub Desktop.
namespace AckAck
{
public class EventStoreJournal : IJournalWriter
{
private readonly IEventStoreConnection _eventStore;
private readonly IFormatter _formatter;
public EventStoreJournal(IEventStoreConnection connection)
{
_formatter = new BinaryFormatter();
_eventStore = connection;
}
public async Task AppendAsync(IEnumerable<Command> commands)
{
await _eventStore.AppendToStreamAsync("akka", ExpectedVersion.Any,
commands.Select(ToEventData));
}
byte[] _bytes = new byte[200];
private EventData ToEventData(Command command)
{
var id = Guid.NewGuid();
//var stream = new MemoryStream();
//_formatter.Serialize(stream, command);
return new EventData(id, "akka", false, _bytes, null);
}
public void Dispose()
{
_eventStore.Close();
}
}
}
namespace AckAck
{
/// <summary>
/// Append multiple commands accumulated during a specific time period or up
/// to a specific limit.
/// </summary>
public class JournalWriter : ReceiveActor
{
readonly private IJournalWriter _journalWriter;
//number of commands at a time to journal
public int BatchSize = 100;
//or after a specific time elapsed, whichever comes first
public TimeSpan Interval = TimeSpan.FromMilliseconds(10);
//buffered commands waiting to be written to the journal
private readonly List<CommandContext> _commandBuffer = new List<CommandContext>(200000);
private readonly Queue<CommandContext[]> _waitingForJournalAck = new Queue<CommandContext[]>(200000);
//pass on the journaled commands to this actor
readonly ActorRef _executor;
public JournalWriter(ActorRef executor, int batchSize, IJournalWriter journalWriter)
{
_journalWriter = journalWriter;
BatchSize = batchSize;
_executor = executor;
Receive<CommandContext>(t => Accept(t));
Receive<ReceiveTimeout>(_ => Go());
Receive<JournalAcknowledgement>(_ => _executor.Tell(_waitingForJournalAck.Dequeue()));
SetReceiveTimeout(Interval);
}
private void Go()
{
if (_commandBuffer.Count > 0)
{
var self = Self;
var batch = _commandBuffer.ToArray();
var task = _journalWriter.AppendAsync(batch.Select(item => item.Command));
_commandBuffer.Clear();
_waitingForJournalAck.Enqueue(batch);
task.ContinueWith(t => self.Tell(JournalAcknowledgement.Instance));
}
}
private void Accept(CommandContext command)
{
_commandBuffer.Add(command);
if (_commandBuffer.Count == BatchSize) Go();
}
}
}
namespace AckAck
{
/// <summary>
/// Prevalence engine
/// </summary>
/// <typeparam name="M"></typeparam>
public class Prevayler<M> : IDisposable
{
readonly ActorSystem _actorSystem;
readonly ActorRef _dispatcher;
public Prevayler(M model, int batchSize = 100)
{
// the kernel is an origodb component which
// synchronizes reads and writes to the model
// will be shared by
var kernel = new Kernel(model);
var eventStore = EventStoreConnection.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
eventStore.ConnectAsync().Wait();
var journalWriter = new EventStoreJournal(eventStore);
//build the chain of actors backwards
_actorSystem = ActorSystem.Create("prevayler");
//executor executes commands
//it could also handle queries but would allow either a single query or command at time.
//better to add a group of actors that can execute queries concurrently
var executor = _actorSystem.ActorOf(Props.Create(() => new Executor(kernel)));
//journaler writes commands to the journal in batches or at specific intervals
//before passing to the executor
var journaler = _actorSystem.ActorOf(Props.Create(() => new JournalWriter(executor, batchSize, journalWriter)));
//dispatcher prepares initial message and passes to journaler
_dispatcher = _actorSystem.ActorOf(Props.Create(() => new Dispatcher(journaler)));
}
public Task<R> ExecuteAsync<R>(Command<M,R> command)
{
return _dispatcher.Ask<R>(command);
}
public Task ExecuteAsync(Command<M> command)
{
return _dispatcher.Ask(command);
}
public R Execute<R>(Command<M, R> command)
{
return ExecuteAsync(command).Result;
}
public void Execute(Command<M> command)
{
ExecuteAsync(command).Wait();
}
public void Dispose()
{
_actorSystem.Shutdown();
_actorSystem.WaitForShutdown();
}
}
}
[Test]
public void Smoke(int batchSize = 100)
{
Console.WriteLine("Batch size: " + batchSize);
var sw = new Stopwatch();
var prevayler = new Prevayler<List<string>>(new List<string>(), batchSize);
sw.Start();
var tasks = Enumerable
.Range(0, 10000)
.Select(i => prevayler.ExecuteAsync(new AddItemCommand(i.ToString()))).ToArray();
Task.WaitAll(tasks);
sw.Stop();
Console.WriteLine("async elapsed: " + sw.Elapsed);
prevayler.Dispose();
}
[Test]
public void ProgressiveBatchSizes()
{
foreach (var batchSize in Enumerable.Range(0,12).Select(i => 10 * Math.Pow(2, i)))
{
Smoke((int)batchSize);
}
}
Batch size: 10
async elapsed: 00:00:00.9428450
Batch size: 20
[INFO][2014-10-20 18:31:16][Thread 0015][akka://prevayler/deadLetters] Message DeathWatchNotification from akka://prevayler/deadLetters to akka://prevayler/deadLetters was not delivered. 1 dead letters encountered.
async elapsed: 00:00:00.3034449
Batch size: 40
async elapsed: 00:00:00.2783324
Batch size: 80
async elapsed: 00:00:00.2526579
Batch size: 160
[INFO][2014-10-20 18:31:16][Thread 0013][akka://prevayler/deadLetters] Message DeathWatchNotification from akka://prevayler/deadLetters to akka://prevayler/deadLetters was not delivered. 1 dead letters encountered.
async elapsed: 00:00:00.2682234
Batch size: 320
async elapsed: 00:00:00.2368135
Batch size: 640
async elapsed: 00:00:00.2893782
Batch size: 1280
async elapsed: 00:00:00.2482176
Batch size: 2560
async elapsed: 00:00:00.2987054
Batch size: 5120
async elapsed: 00:00:00.2967950
Batch size: 10240
async elapsed: 00:00:00.4178282
Batch size: 20480
async elapsed: 00:00:00.3720448
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment