Created
November 4, 2014 16:47
-
-
Save rofr/8c3cbaf9a46e6bfd41a0 to your computer and use it in GitHub Desktop.
Async OrigoDB spike using disruptor.net
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Linq; | |
using System.Net; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
using Disruptor; | |
using Disruptor.Dsl; | |
using EventStore.ClientAPI; | |
namespace AckAck.Disruptive | |
{ | |
public class Dispatcher : IEventHandler<BufferEntry> | |
{ | |
private readonly Disruptor<BufferEntry> _executionBuffer; | |
public Dispatcher(Disruptor<BufferEntry> executionBuffer) | |
{ | |
_executionBuffer = executionBuffer; | |
} | |
public void OnNext(BufferEntry data, long sequence, bool endOfBatch) | |
{ | |
_executionBuffer.PublishEvent((e,i) => | |
{ | |
e.Transaction = data.Transaction; | |
e.Result = data.Result; | |
return e; | |
}); | |
} | |
} | |
public class Journaler : IEventHandler<BufferEntry> | |
{ | |
private BufferEntry[] _buffer = new BufferEntry[1024]; | |
private int _current; | |
private IJournalWriter _journal; | |
public Journaler(IJournalWriter journal) | |
{ | |
_journal = journal; | |
} | |
public void OnNext(BufferEntry data, long sequence, bool endOfBatch) | |
{ | |
if (_current == _buffer.Length) Process(); | |
_buffer[_current++] = data; | |
if (endOfBatch) Process(); | |
} | |
private void Process() | |
{ | |
var commands = _buffer.Take(_current).Select(e => e.Transaction as Command); | |
_journal.AppendAsync(commands).Wait(); | |
_current = 0; | |
} | |
} | |
public class BufferEntry | |
{ | |
//either a query or a command | |
public object Transaction; | |
public WriteOnceBlock<object> Result; | |
} | |
public interface IEngine<M> : IDisposable | |
{ | |
Task<R> ExecuteAsync<R>(Command<M, R> command); | |
Task ExecuteAsync(Command<M> command); | |
Task<R> ExecuteAsync<R>(Query<M, R> query); | |
R Execute<R>(Command<M, R> command); | |
void Execute(Command<M> command); | |
} | |
public class DisruptorEngine<M> : IEngine<M> | |
{ | |
private Disruptor<BufferEntry> _commandQueue; | |
private Disruptor<BufferEntry> _executionDisruptor; | |
public DisruptorEngine(M model) | |
{ | |
Kernel kernel = new Kernel(model); | |
_executionDisruptor = new Disruptor<BufferEntry>( | |
() => new BufferEntry(), | |
new SingleThreadedClaimStrategy(1024), | |
new YieldingWaitStrategy(), | |
TaskScheduler.Default); | |
_executionDisruptor.HandleEventsWith(new ExecutionHandler(kernel)); | |
_commandQueue = new Disruptor<BufferEntry>( | |
() => new BufferEntry(), | |
new MultiThreadedClaimStrategy(1024), | |
new YieldingWaitStrategy(), | |
TaskScheduler.Default); | |
var eventStore = EventStoreConnection.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113)); | |
eventStore.ConnectAsync().Wait(); | |
var journalWriter = new EventStoreJournal(eventStore); | |
_commandQueue.HandleEventsWith(new Journaler(journalWriter)) | |
.Then(new Dispatcher(_executionDisruptor)); | |
_executionDisruptor.Start(); | |
_commandQueue.Start(); | |
} | |
public async Task<R> ExecuteAsync<R>(Command<M, R> command) | |
{ | |
var completion = new WriteOnceBlock<object>(_ => _); | |
_commandQueue.PublishEvent((e, i) => | |
{ | |
e.Transaction = command; | |
e.Result = completion; | |
return e; | |
}); | |
return (R) await completion.ReceiveAsync(); | |
} | |
public Task ExecuteAsync(Command<M> command) | |
{ | |
var completion = new WriteOnceBlock<object>(_ => _); | |
_commandQueue.PublishEvent((e, i) => | |
{ | |
e.Transaction = command; | |
e.Result = completion; | |
return e; | |
}); | |
return completion.Completion; | |
} | |
public async Task<R> ExecuteAsync<R>(Query<M, R> query) | |
{ | |
var completion = new WriteOnceBlock<object>(_ => _); | |
_executionDisruptor.PublishEvent((e, i) => | |
{ | |
e.Transaction = query; | |
e.Result = completion; | |
return e; | |
}); | |
return (R)await completion.ReceiveAsync(); | |
} | |
public R Execute<R>(Command<M, R> command) | |
{ | |
return ExecuteAsync(command).Result; | |
} | |
public void Execute(Command<M> command) | |
{ | |
ExecuteAsync(command).Wait(); | |
} | |
public void Dispose() | |
{ | |
_commandQueue.Shutdown(); | |
_executionDisruptor.Shutdown(); | |
} | |
} | |
public class ExecutionHandler : IEventHandler<BufferEntry> | |
{ | |
readonly Kernel _kernel; | |
public ExecutionHandler(Kernel kernel) | |
{ | |
_kernel = kernel; | |
} | |
public void OnNext(BufferEntry data, long sequence, bool endOfBatch) | |
{ | |
object result = null; | |
if (data.Transaction is Command) | |
{ | |
result = _kernel.Execute(data.Transaction as Command); | |
} | |
else | |
{ | |
result = _kernel.Execute(data.Transaction as Query); | |
} | |
data.Result.Post(result); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
There ought to be a better way to complete the callback on line 187 than using a WriteOnceBlock? Calls originate from ExecuteAsync, see overload on line 111