Created
November 3, 2014 21:14
-
-
Save rofr/0605c4fdf214c22014a4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Dispatcher | |
{ | |
private Kernel _kernel; | |
readonly BufferBlock<CommandContext[]> _commandQueue; | |
readonly BatchBlock<QueryContext> _queryQueue; | |
readonly ActionBlock<object> _executor; | |
readonly Timer _timer; | |
/// <summary> | |
/// Maximum latency for queries | |
/// </summary> | |
public TimeSpan Interval = TimeSpan.FromMilliseconds(1); | |
public int MaxConcurrentQueries = 4; | |
public Dispatcher(Kernel kernel) | |
{ | |
_kernel = kernel; | |
_commandQueue = new BufferBlock<CommandContext[]>(); | |
_queryQueue = new BatchBlock<QueryContext>(MaxConcurrentQueries); | |
_executor = new ActionBlock<object>(t => | |
{ | |
if (t is QueryContext[]) | |
{ | |
var queries = t as QueryContext[]; | |
Task[] tasks = queries.Select(q => Task.Factory.StartNew(_ => ExecuteQuery(q), null)).ToArray(); | |
Task.WaitAll(tasks); | |
} | |
else if (t is CommandContext[]) | |
{ | |
var commands = t as CommandContext[]; | |
foreach (var commandContext in commands) | |
{ | |
var result = _kernel.Execute(commandContext.Command); | |
commandContext.Response.Post(result); | |
} | |
} | |
}); | |
_commandQueue.LinkTo(_executor); | |
_queryQueue.LinkTo(_executor); | |
_timer = new Timer(_ => _queryQueue.TriggerBatch()); | |
_timer.Change(Interval, Interval); | |
} | |
void ExecuteQuery(QueryContext context) | |
{ | |
var result = _kernel.Execute(context.Query); | |
context.Response.Post(result); | |
} | |
internal void Post(CommandContext[] commands) | |
{ | |
_commandQueue.Post(commands); | |
} | |
internal void Post(QueryContext query) | |
{ | |
_queryQueue.Post(query); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Prevayler<M> : IDisposable | |
{ | |
private TplJournalWriter _journalWriter; | |
private Dispatcher _dispatcher; | |
public Prevayler(M model, int batchSize) | |
{ | |
// the kernel is an origodb component which | |
var kernel = new Kernel(model); | |
_dispatcher = new Dispatcher(kernel); | |
var eventStore = EventStoreConnection.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113)); | |
eventStore.ConnectAsync().Wait(); | |
var journalWriter = new EventStoreJournal(eventStore); | |
_journalWriter = new TplJournalWriter(journalWriter, _dispatcher, batchSize); | |
} | |
public async Task<R> ExecuteAsync<R>(Command<M,R> command) | |
{ | |
var response = new WriteOnceBlock<object>(r => r); | |
_journalWriter.Post(new CommandContext(command, response)); | |
return (R) await response.ReceiveAsync(); | |
} | |
public Task ExecuteAsync(Command<M> command) | |
{ | |
var response = new WriteOnceBlock<object>(b => b); | |
_journalWriter.Post(new CommandContext(command, response)); | |
return response.ReceiveAsync(); | |
} | |
public async Task<R> ExecuteAsync<R>(Query<M, R> query) | |
{ | |
var response = new WriteOnceBlock<object>(r => r); | |
_dispatcher.Post(new QueryContext(query, response)); | |
return (R)await response.ReceiveAsync(); | |
} | |
public R Execute<R>(Command<M, R> command) | |
{ | |
return ExecuteAsync(command).Result; | |
} | |
public void Execute(Command<M> command) | |
{ | |
ExecuteAsync(command).Wait(); | |
} | |
public void Dispose() | |
{ | |
_journalWriter.Dispose(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TplJournalWriter : IDisposable | |
{ | |
private readonly IJournalWriter _journalWriter; | |
private Timer _timer; | |
//commands start here | |
private readonly BatchBlock<CommandContext> _requestQueue; | |
//then go here at given intervals or when the batch size is reached | |
private ActionBlock<CommandContext[]> _writerBlock; | |
//after journaling, commands are passed to the dispatcher for scheduling | |
private readonly Dispatcher _dispatcher; | |
//profiling stuff | |
private List<int> _batchSizes = new List<int>(); | |
private int _timerInvocations = 0; | |
public TimeSpan Interval { get; set; } | |
public TplJournalWriter(IJournalWriter journalWriter, Dispatcher dispatcher, int batchSize) | |
{ | |
Interval = TimeSpan.FromMilliseconds(16); | |
_journalWriter = journalWriter; | |
_dispatcher = dispatcher; | |
_writerBlock = new ActionBlock<CommandContext[]>(batch => Go(batch)); | |
_requestQueue = new BatchBlock<CommandContext>(batchSize); | |
_requestQueue.LinkTo(_writerBlock); | |
} | |
private void OnTimerTick(object state) | |
{ | |
_requestQueue.TriggerBatch(); | |
_timerInvocations++; | |
SetTimer(); | |
} | |
private void SetTimer() | |
{ | |
_timer.Change(Interval, TimeSpan.FromMilliseconds(-1)); | |
} | |
public void Post(CommandContext request) | |
{ | |
if (_timer == null) | |
{ | |
_timer = new Timer(OnTimerTick); | |
SetTimer(); | |
} | |
_requestQueue.Post(request); | |
} | |
private void Go(CommandContext[] batch) | |
{ | |
_batchSizes.Add(batch.Length); | |
_journalWriter.AppendAsync(batch.Select(ctx => ctx.Command)) | |
.ContinueWith(t => _dispatcher.Post(batch)); | |
SetTimer(); | |
} | |
public void Dispose() | |
{ | |
Console.WriteLine("Timer invocations:" + _timerInvocations); | |
var histogram = new SortedDictionary<int, int>(); | |
foreach (var count in _batchSizes) | |
{ | |
if (!histogram.ContainsKey(count)) histogram[count] = 1; | |
else histogram[count]++; | |
} | |
foreach (var key in histogram.Keys) | |
{ | |
Console.WriteLine(key + ": " + histogram[key]); | |
} | |
_journalWriter.Dispose(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment