Last active
June 8, 2016 23:04
-
-
Save philcleveland/4744120 to your computer and use it in GitHub Desktop.
Event dispatcher which receives events from the GetEventStore after they are saved. It takes the saved events and publishes them to the passed in Event Bus. This ensures that events are not published until they are saved in the GetEventStore. Big thanks to Andrii for all the reviews and coding help to get this thing working.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class GetEventStoreEventDispatcher | |
{ | |
private const int RECONNECT_TIMEOUT_MILLISEC = 5000; | |
private const int THREAD_KILL_TIMEOUT_MILLISEC = 5000; | |
private const int READ_PAGE_SIZE = 500; | |
private const int LIVE_QUEUE_SIZE_LIMIT = 10000; | |
private readonly IEventBus _eventBus; | |
private readonly EventStoreConnection _store; | |
private readonly ConcurrentQueue<ResolvedEvent> _liveQueue = new ConcurrentQueue<ResolvedEvent>(); | |
private readonly ManualResetEventSlim _liveDone = new ManualResetEventSlim(true); | |
private readonly ConcurrentQueue<ResolvedEvent> _historicalQueue = new ConcurrentQueue<ResolvedEvent>(); | |
private readonly ManualResetEventSlim _historicalDone = new ManualResetEventSlim(true); | |
private volatile bool _stop; | |
private volatile bool _livePublishingAllowed; | |
private int _isPublishing; | |
private Position _lastProcessed; | |
private EventStoreSubscription _subscription; | |
public GetEventStoreEventDispatcher(EventStoreConnection store, IEventBus eventBus) | |
{ | |
if (store == null) throw new ArgumentNullException("store"); | |
if (eventBus == null) throw new ArgumentNullException("eventBus"); | |
_store = store; | |
_eventBus = eventBus; | |
_lastProcessed = new Position(-1, -1); | |
} | |
// Credit algorithm to Szymon Pobiega | |
// http://simon-says-architecture.com/2013/02/02/mechanics-of-durable-subscription/#comments | |
// 1. The subscriber always starts with pull assuming there were some messages generated while it was offline | |
// 2. The subscriber pulls messages until there’s nothing left to pull (it is up to date with the stream) | |
// 3. Push subscription is started but arriving messages are not processed immediately but temporarily redirected to a buffer | |
// 4. One last pull is done to ensure nothing happened between step 2 and 3 | |
// 5. Messages from this last pull are processed | |
// 6. Processing messages from push buffer is started. While messages are processed, they are checked against IDs of messages processed in step 5 to ensure there’s no duplicates. | |
// 7. System works in push model until subscriber is killed or subscription is dropped by publisher drops push subscription. | |
//Credit to Andrii Nakryiko | |
//If data is written to storage at such a speed, that between the moment you did your last | |
//pull read and the moment you subscribed to push notifications more data (events) were | |
//generated, than you request in one pull request, you would need to repeat steps 4-5 few | |
//times until you get a pull message which position is >= subscription position | |
//(EventStore provides you with those positions). | |
public void StartDispatching() | |
{ | |
RecoverSubscription(); | |
} | |
private void RecoverSubscription() | |
{ | |
_livePublishingAllowed = false; | |
_liveDone.Wait(); // wait until all live processing is finished (queue is empty, _lastProcessed updated) | |
//AN: if _lastProcessed == (-1, -1) then we haven't processed anything yet, so we start from Position.Start | |
var startPos = _lastProcessed == new Position(-1, -1) ? Position.Start : _lastProcessed; | |
var nextPos = ReadHistoricalEventsFrom(startPos); | |
_subscription = SubscribeToAll(); | |
ReadHistoricalEventsFrom(nextPos); | |
_historicalDone.Wait(); // wait until historical queue is empty and _lastProcessed updated | |
_livePublishingAllowed = true; | |
EnsurePublishEvents(_liveQueue, _liveDone); | |
} | |
public void StopDispatching() | |
{ | |
_stop = true; | |
if (_subscription != null) | |
_subscription.Unsubscribe(); | |
// hopefully additional check in PublishEvents (additional check for _stop after setting event) prevents race conditions | |
if (!_historicalDone.Wait(THREAD_KILL_TIMEOUT_MILLISEC)) | |
throw new TimeoutException("Unable to stop dispatching in time."); | |
if (!_liveDone.Wait(THREAD_KILL_TIMEOUT_MILLISEC)) | |
throw new TimeoutException("Unable to stop dispatching in time."); | |
} | |
private Position ReadHistoricalEventsFrom(Position from) | |
{ | |
var position = from; | |
AllEventsSlice slice; | |
while (!_stop && (slice = _store.ReadAllEventsForward(position, READ_PAGE_SIZE, false)).Events.Length > 0) | |
{ | |
foreach (var rawEvent in slice.Events) | |
{ | |
_historicalQueue.Enqueue(rawEvent); | |
} | |
EnsurePublishEvents(_historicalQueue, _historicalDone); | |
position = slice.NextPosition; | |
} | |
return position; | |
} | |
private EventStoreSubscription SubscribeToAll() | |
{ | |
//TODO: Before trying to resubscribe - how to ensure that store is active and ready to accept. | |
//AN: EventStoreConnection automatically tries to connect (if not already connected) to EventStore, | |
//so you don't have to do something manually | |
//Though in case of errors, you need to do some actions (if EventStore server is down or not yet up, etc) | |
var task = _store.SubscribeToAll(false, HandleEventAppeared, HandleSubscriptionDropped); | |
if (!task.Wait(RECONNECT_TIMEOUT_MILLISEC)) | |
throw new TimeoutException("Could not reconnect after the subscription was dropped"); | |
return task.Result; | |
} | |
private void HandleEventAppeared(ResolvedEvent rawEvent) | |
{ | |
if (_stop) return; | |
_liveQueue.Enqueue(rawEvent); | |
//Prevent live queue memory explosion. | |
if (!_livePublishingAllowed && _liveQueue.Count > LIVE_QUEUE_SIZE_LIMIT) | |
{ | |
ResolvedEvent throwAwayEvent; | |
_liveQueue.TryDequeue(out throwAwayEvent); | |
} | |
if (_livePublishingAllowed) | |
EnsurePublishEvents(_liveQueue, _liveDone); | |
} | |
private void HandleSubscriptionDropped() | |
{ | |
if (_stop) return; | |
RecoverSubscription(); | |
} | |
private void EnsurePublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent) | |
{ | |
if (_stop) return; | |
if (Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0) | |
ThreadPool.QueueUserWorkItem(_ => PublishEvents(queue, doneEvent)); | |
} | |
private void PublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent) | |
{ | |
bool keepGoing = true; | |
while (keepGoing) | |
{ | |
doneEvent.Reset(); // signal we start processing this queue | |
if (_stop) // this is to avoid race condition in StopDispatching, though it is 1AM here, so I could be wrong :) | |
{ | |
doneEvent.Set(); | |
Interlocked.CompareExchange(ref _isPublishing, 0, 1); | |
return; | |
} | |
ResolvedEvent evnt; | |
while (!_stop && queue.TryDequeue(out evnt)) | |
{ | |
if (evnt.OriginalPosition > _lastProcessed) // this ensures we don't process same events twice | |
{ | |
var processedEvent = ProcessRawEvent(evnt); | |
if (processedEvent != null) | |
_eventBus.Publish(processedEvent); | |
_lastProcessed = evnt.OriginalPosition.Value; | |
} | |
} | |
doneEvent.Set(); // signal end of processing particular queue | |
Interlocked.CompareExchange(ref _isPublishing, 0, 1); | |
// try to reacquire lock if needed | |
keepGoing = !_stop && queue.Count > 0 && Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0; | |
} | |
} | |
private IEvent ProcessRawEvent(ResolvedEvent rawEvent) | |
{ | |
if (rawEvent.OriginalEvent.Metadata.Length > 0 && rawEvent.OriginalEvent.Data.Length > 0) | |
return DeserializeEvent(rawEvent.OriginalEvent.Metadata, rawEvent.OriginalEvent.Data); | |
return null; | |
} | |
/// <summary> | |
/// Deserializes the event from the raw GetEventStore event to my event. | |
/// Took this from a gist that James Nugent posted on the GetEventStore forumns. | |
/// </summary> | |
/// <param name="metadata"></param> | |
/// <param name="data"></param> | |
/// <returns></returns> | |
private static IEvent DeserializeEvent(byte[] metadata, byte[] data) | |
{ | |
const string EventClrTypeHeader = "EventClrTypeName"; | |
var eventClrTypeName = JObject.Parse(Encoding.UTF8.GetString(metadata)).Property(EventClrTypeHeader).Value; | |
return (IEvent)JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), Type.GetType((string)eventClrTypeName)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment