Created
October 20, 2014 16:37
-
-
Save rofr/9d9e1cc76b7aef26d8f3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace AckAck | |
{ | |
public class EventStoreJournal : IJournalWriter | |
{ | |
private readonly IEventStoreConnection _eventStore; | |
private readonly IFormatter _formatter; | |
public EventStoreJournal(IEventStoreConnection connection) | |
{ | |
_formatter = new BinaryFormatter(); | |
_eventStore = connection; | |
} | |
public async Task AppendAsync(IEnumerable<Command> commands) | |
{ | |
await _eventStore.AppendToStreamAsync("akka", ExpectedVersion.Any, | |
commands.Select(ToEventData)); | |
} | |
byte[] _bytes = new byte[200]; | |
private EventData ToEventData(Command command) | |
{ | |
var id = Guid.NewGuid(); | |
//var stream = new MemoryStream(); | |
//_formatter.Serialize(stream, command); | |
return new EventData(id, "akka", false, _bytes, null); | |
} | |
public void Dispose() | |
{ | |
_eventStore.Close(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace AckAck | |
{ | |
/// <summary> | |
/// Append multiple commands accumulated during a specific time period or up | |
/// to a specific limit. | |
/// </summary> | |
public class JournalWriter : ReceiveActor | |
{ | |
readonly private IJournalWriter _journalWriter; | |
//number of commands at a time to journal | |
public int BatchSize = 100; | |
//or after a specific time elapsed, whichever comes first | |
public TimeSpan Interval = TimeSpan.FromMilliseconds(10); | |
//buffered commands waiting to be written to the journal | |
private readonly List<CommandContext> _commandBuffer = new List<CommandContext>(200000); | |
private readonly Queue<CommandContext[]> _waitingForJournalAck = new Queue<CommandContext[]>(200000); | |
//pass on the journaled commands to this actor | |
readonly ActorRef _executor; | |
public JournalWriter(ActorRef executor, int batchSize, IJournalWriter journalWriter) | |
{ | |
_journalWriter = journalWriter; | |
BatchSize = batchSize; | |
_executor = executor; | |
Receive<CommandContext>(t => Accept(t)); | |
Receive<ReceiveTimeout>(_ => Go()); | |
Receive<JournalAcknowledgement>(_ => _executor.Tell(_waitingForJournalAck.Dequeue())); | |
SetReceiveTimeout(Interval); | |
} | |
private void Go() | |
{ | |
if (_commandBuffer.Count > 0) | |
{ | |
var self = Self; | |
var batch = _commandBuffer.ToArray(); | |
var task = _journalWriter.AppendAsync(batch.Select(item => item.Command)); | |
_commandBuffer.Clear(); | |
_waitingForJournalAck.Enqueue(batch); | |
task.ContinueWith(t => self.Tell(JournalAcknowledgement.Instance)); | |
} | |
} | |
private void Accept(CommandContext command) | |
{ | |
_commandBuffer.Add(command); | |
if (_commandBuffer.Count == BatchSize) Go(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace AckAck | |
{ | |
/// <summary> | |
/// Prevalence engine | |
/// </summary> | |
/// <typeparam name="M"></typeparam> | |
public class Prevayler<M> : IDisposable | |
{ | |
readonly ActorSystem _actorSystem; | |
readonly ActorRef _dispatcher; | |
public Prevayler(M model, int batchSize = 100) | |
{ | |
// the kernel is an origodb component which | |
// synchronizes reads and writes to the model | |
// will be shared by | |
var kernel = new Kernel(model); | |
var eventStore = EventStoreConnection.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113)); | |
eventStore.ConnectAsync().Wait(); | |
var journalWriter = new EventStoreJournal(eventStore); | |
//build the chain of actors backwards | |
_actorSystem = ActorSystem.Create("prevayler"); | |
//executor executes commands | |
//it could also handle queries but would allow either a single query or command at time. | |
//better to add a group of actors that can execute queries concurrently | |
var executor = _actorSystem.ActorOf(Props.Create(() => new Executor(kernel))); | |
//journaler writes commands to the journal in batches or at specific intervals | |
//before passing to the executor | |
var journaler = _actorSystem.ActorOf(Props.Create(() => new JournalWriter(executor, batchSize, journalWriter))); | |
//dispatcher prepares initial message and passes to journaler | |
_dispatcher = _actorSystem.ActorOf(Props.Create(() => new Dispatcher(journaler))); | |
} | |
public Task<R> ExecuteAsync<R>(Command<M,R> command) | |
{ | |
return _dispatcher.Ask<R>(command); | |
} | |
public Task ExecuteAsync(Command<M> command) | |
{ | |
return _dispatcher.Ask(command); | |
} | |
public R Execute<R>(Command<M, R> command) | |
{ | |
return ExecuteAsync(command).Result; | |
} | |
public void Execute(Command<M> command) | |
{ | |
ExecuteAsync(command).Wait(); | |
} | |
public void Dispose() | |
{ | |
_actorSystem.Shutdown(); | |
_actorSystem.WaitForShutdown(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Test] | |
public void Smoke(int batchSize = 100) | |
{ | |
Console.WriteLine("Batch size: " + batchSize); | |
var sw = new Stopwatch(); | |
var prevayler = new Prevayler<List<string>>(new List<string>(), batchSize); | |
sw.Start(); | |
var tasks = Enumerable | |
.Range(0, 10000) | |
.Select(i => prevayler.ExecuteAsync(new AddItemCommand(i.ToString()))).ToArray(); | |
Task.WaitAll(tasks); | |
sw.Stop(); | |
Console.WriteLine("async elapsed: " + sw.Elapsed); | |
prevayler.Dispose(); | |
} | |
[Test] | |
public void ProgressiveBatchSizes() | |
{ | |
foreach (var batchSize in Enumerable.Range(0,12).Select(i => 10 * Math.Pow(2, i))) | |
{ | |
Smoke((int)batchSize); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Batch size: 10 | |
async elapsed: 00:00:00.9428450 | |
Batch size: 20 | |
[INFO][2014-10-20 18:31:16][Thread 0015][akka://prevayler/deadLetters] Message DeathWatchNotification from akka://prevayler/deadLetters to akka://prevayler/deadLetters was not delivered. 1 dead letters encountered. | |
async elapsed: 00:00:00.3034449 | |
Batch size: 40 | |
async elapsed: 00:00:00.2783324 | |
Batch size: 80 | |
async elapsed: 00:00:00.2526579 | |
Batch size: 160 | |
[INFO][2014-10-20 18:31:16][Thread 0013][akka://prevayler/deadLetters] Message DeathWatchNotification from akka://prevayler/deadLetters to akka://prevayler/deadLetters was not delivered. 1 dead letters encountered. | |
async elapsed: 00:00:00.2682234 | |
Batch size: 320 | |
async elapsed: 00:00:00.2368135 | |
Batch size: 640 | |
async elapsed: 00:00:00.2893782 | |
Batch size: 1280 | |
async elapsed: 00:00:00.2482176 | |
Batch size: 2560 | |
async elapsed: 00:00:00.2987054 | |
Batch size: 5120 | |
async elapsed: 00:00:00.2967950 | |
Batch size: 10240 | |
async elapsed: 00:00:00.4178282 | |
Batch size: 20480 | |
async elapsed: 00:00:00.3720448 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment