Skip to content

Instantly share code, notes, and snippets.

@rofr
Created November 4, 2014 16:47
Show Gist options
  • Save rofr/8c3cbaf9a46e6bfd41a0 to your computer and use it in GitHub Desktop.
Save rofr/8c3cbaf9a46e6bfd41a0 to your computer and use it in GitHub Desktop.
Async OrigoDB spike using disruptor.net
using System;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Disruptor;
using Disruptor.Dsl;
using EventStore.ClientAPI;
namespace AckAck.Disruptive
{
public class Dispatcher : IEventHandler<BufferEntry>
{
private readonly Disruptor<BufferEntry> _executionBuffer;
public Dispatcher(Disruptor<BufferEntry> executionBuffer)
{
_executionBuffer = executionBuffer;
}
public void OnNext(BufferEntry data, long sequence, bool endOfBatch)
{
_executionBuffer.PublishEvent((e,i) =>
{
e.Transaction = data.Transaction;
e.Result = data.Result;
return e;
});
}
}
public class Journaler : IEventHandler<BufferEntry>
{
private BufferEntry[] _buffer = new BufferEntry[1024];
private int _current;
private IJournalWriter _journal;
public Journaler(IJournalWriter journal)
{
_journal = journal;
}
public void OnNext(BufferEntry data, long sequence, bool endOfBatch)
{
if (_current == _buffer.Length) Process();
_buffer[_current++] = data;
if (endOfBatch) Process();
}
private void Process()
{
var commands = _buffer.Take(_current).Select(e => e.Transaction as Command);
_journal.AppendAsync(commands).Wait();
_current = 0;
}
}
public class BufferEntry
{
//either a query or a command
public object Transaction;
public WriteOnceBlock<object> Result;
}
public interface IEngine<M> : IDisposable
{
Task<R> ExecuteAsync<R>(Command<M, R> command);
Task ExecuteAsync(Command<M> command);
Task<R> ExecuteAsync<R>(Query<M, R> query);
R Execute<R>(Command<M, R> command);
void Execute(Command<M> command);
}
public class DisruptorEngine<M> : IEngine<M>
{
private Disruptor<BufferEntry> _commandQueue;
private Disruptor<BufferEntry> _executionDisruptor;
public DisruptorEngine(M model)
{
Kernel kernel = new Kernel(model);
_executionDisruptor = new Disruptor<BufferEntry>(
() => new BufferEntry(),
new SingleThreadedClaimStrategy(1024),
new YieldingWaitStrategy(),
TaskScheduler.Default);
_executionDisruptor.HandleEventsWith(new ExecutionHandler(kernel));
_commandQueue = new Disruptor<BufferEntry>(
() => new BufferEntry(),
new MultiThreadedClaimStrategy(1024),
new YieldingWaitStrategy(),
TaskScheduler.Default);
var eventStore = EventStoreConnection.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
eventStore.ConnectAsync().Wait();
var journalWriter = new EventStoreJournal(eventStore);
_commandQueue.HandleEventsWith(new Journaler(journalWriter))
.Then(new Dispatcher(_executionDisruptor));
_executionDisruptor.Start();
_commandQueue.Start();
}
public async Task<R> ExecuteAsync<R>(Command<M, R> command)
{
var completion = new WriteOnceBlock<object>(_ => _);
_commandQueue.PublishEvent((e, i) =>
{
e.Transaction = command;
e.Result = completion;
return e;
});
return (R) await completion.ReceiveAsync();
}
public Task ExecuteAsync(Command<M> command)
{
var completion = new WriteOnceBlock<object>(_ => _);
_commandQueue.PublishEvent((e, i) =>
{
e.Transaction = command;
e.Result = completion;
return e;
});
return completion.Completion;
}
public async Task<R> ExecuteAsync<R>(Query<M, R> query)
{
var completion = new WriteOnceBlock<object>(_ => _);
_executionDisruptor.PublishEvent((e, i) =>
{
e.Transaction = query;
e.Result = completion;
return e;
});
return (R)await completion.ReceiveAsync();
}
public R Execute<R>(Command<M, R> command)
{
return ExecuteAsync(command).Result;
}
public void Execute(Command<M> command)
{
ExecuteAsync(command).Wait();
}
public void Dispose()
{
_commandQueue.Shutdown();
_executionDisruptor.Shutdown();
}
}
public class ExecutionHandler : IEventHandler<BufferEntry>
{
readonly Kernel _kernel;
public ExecutionHandler(Kernel kernel)
{
_kernel = kernel;
}
public void OnNext(BufferEntry data, long sequence, bool endOfBatch)
{
object result = null;
if (data.Transaction is Command)
{
result = _kernel.Execute(data.Transaction as Command);
}
else
{
result = _kernel.Execute(data.Transaction as Query);
}
data.Result.Post(result);
}
}
}
@rofr
Copy link
Author

rofr commented Nov 4, 2014

There ought to be a better way to complete the callback on line 187 than using a WriteOnceBlock? Calls originate from ExecuteAsync, see overload on line 111

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment