Skip to content

Instantly share code, notes, and snippets.

@rofr
Created November 3, 2014 21:14
Show Gist options
  • Save rofr/0605c4fdf214c22014a4 to your computer and use it in GitHub Desktop.
Save rofr/0605c4fdf214c22014a4 to your computer and use it in GitHub Desktop.
public class Dispatcher
{
private Kernel _kernel;
readonly BufferBlock<CommandContext[]> _commandQueue;
readonly BatchBlock<QueryContext> _queryQueue;
readonly ActionBlock<object> _executor;
readonly Timer _timer;
/// <summary>
/// Maximum latency for queries
/// </summary>
public TimeSpan Interval = TimeSpan.FromMilliseconds(1);
public int MaxConcurrentQueries = 4;
public Dispatcher(Kernel kernel)
{
_kernel = kernel;
_commandQueue = new BufferBlock<CommandContext[]>();
_queryQueue = new BatchBlock<QueryContext>(MaxConcurrentQueries);
_executor = new ActionBlock<object>(t =>
{
if (t is QueryContext[])
{
var queries = t as QueryContext[];
Task[] tasks = queries.Select(q => Task.Factory.StartNew(_ => ExecuteQuery(q), null)).ToArray();
Task.WaitAll(tasks);
}
else if (t is CommandContext[])
{
var commands = t as CommandContext[];
foreach (var commandContext in commands)
{
var result = _kernel.Execute(commandContext.Command);
commandContext.Response.Post(result);
}
}
});
_commandQueue.LinkTo(_executor);
_queryQueue.LinkTo(_executor);
_timer = new Timer(_ => _queryQueue.TriggerBatch());
_timer.Change(Interval, Interval);
}
void ExecuteQuery(QueryContext context)
{
var result = _kernel.Execute(context.Query);
context.Response.Post(result);
}
internal void Post(CommandContext[] commands)
{
_commandQueue.Post(commands);
}
internal void Post(QueryContext query)
{
_queryQueue.Post(query);
}
}
}
public class Prevayler<M> : IDisposable
{
private TplJournalWriter _journalWriter;
private Dispatcher _dispatcher;
public Prevayler(M model, int batchSize)
{
// the kernel is an origodb component which
var kernel = new Kernel(model);
_dispatcher = new Dispatcher(kernel);
var eventStore = EventStoreConnection.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
eventStore.ConnectAsync().Wait();
var journalWriter = new EventStoreJournal(eventStore);
_journalWriter = new TplJournalWriter(journalWriter, _dispatcher, batchSize);
}
public async Task<R> ExecuteAsync<R>(Command<M,R> command)
{
var response = new WriteOnceBlock<object>(r => r);
_journalWriter.Post(new CommandContext(command, response));
return (R) await response.ReceiveAsync();
}
public Task ExecuteAsync(Command<M> command)
{
var response = new WriteOnceBlock<object>(b => b);
_journalWriter.Post(new CommandContext(command, response));
return response.ReceiveAsync();
}
public async Task<R> ExecuteAsync<R>(Query<M, R> query)
{
var response = new WriteOnceBlock<object>(r => r);
_dispatcher.Post(new QueryContext(query, response));
return (R)await response.ReceiveAsync();
}
public R Execute<R>(Command<M, R> command)
{
return ExecuteAsync(command).Result;
}
public void Execute(Command<M> command)
{
ExecuteAsync(command).Wait();
}
public void Dispose()
{
_journalWriter.Dispose();
}
}
public class TplJournalWriter : IDisposable
{
private readonly IJournalWriter _journalWriter;
private Timer _timer;
//commands start here
private readonly BatchBlock<CommandContext> _requestQueue;
//then go here at given intervals or when the batch size is reached
private ActionBlock<CommandContext[]> _writerBlock;
//after journaling, commands are passed to the dispatcher for scheduling
private readonly Dispatcher _dispatcher;
//profiling stuff
private List<int> _batchSizes = new List<int>();
private int _timerInvocations = 0;
public TimeSpan Interval { get; set; }
public TplJournalWriter(IJournalWriter journalWriter, Dispatcher dispatcher, int batchSize)
{
Interval = TimeSpan.FromMilliseconds(16);
_journalWriter = journalWriter;
_dispatcher = dispatcher;
_writerBlock = new ActionBlock<CommandContext[]>(batch => Go(batch));
_requestQueue = new BatchBlock<CommandContext>(batchSize);
_requestQueue.LinkTo(_writerBlock);
}
private void OnTimerTick(object state)
{
_requestQueue.TriggerBatch();
_timerInvocations++;
SetTimer();
}
private void SetTimer()
{
_timer.Change(Interval, TimeSpan.FromMilliseconds(-1));
}
public void Post(CommandContext request)
{
if (_timer == null)
{
_timer = new Timer(OnTimerTick);
SetTimer();
}
_requestQueue.Post(request);
}
private void Go(CommandContext[] batch)
{
_batchSizes.Add(batch.Length);
_journalWriter.AppendAsync(batch.Select(ctx => ctx.Command))
.ContinueWith(t => _dispatcher.Post(batch));
SetTimer();
}
public void Dispose()
{
Console.WriteLine("Timer invocations:" + _timerInvocations);
var histogram = new SortedDictionary<int, int>();
foreach (var count in _batchSizes)
{
if (!histogram.ContainsKey(count)) histogram[count] = 1;
else histogram[count]++;
}
foreach (var key in histogram.Keys)
{
Console.WriteLine(key + ": " + histogram[key]);
}
_journalWriter.Dispose();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment