Last active
December 16, 2015 05:19
-
-
Save philcleveland/5383804 to your computer and use it in GitHub Desktop.
GetEventStore Event Dispatcher EventStore.ClientAPI compatable v1.1.0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Special thanks to Andrii Nakryiko and James Nugent | |
//for their help with this code. | |
namespace Infrastructure.EventStorage | |
{ | |
using System; | |
using System.Collections.Concurrent; | |
using System.Net; | |
using System.Text; | |
using System.Threading; | |
using EventStore.ClientAPI; | |
using Infrastructure.Messaging.Events; | |
using Newtonsoft.Json; | |
using Newtonsoft.Json.Linq; | |
/// <summary> | |
/// Event Dispatcher for GetEventStore's | |
/// Compatable with EventStore.ClientAPI v1.1.0 | |
/// </summary> | |
public class GetEventStoreEventDispatcher | |
{ | |
private const int THREAD_KILL_TIMEOUT_MILLISEC = 5000; | |
private readonly IEventBus _eventBus; | |
private EventStoreConnection _connection; | |
private readonly IPEndPoint _eventStoreTcpEndpoint; | |
private readonly ConcurrentQueue<ResolvedEvent> _liveQueue = new ConcurrentQueue<ResolvedEvent>(); | |
private readonly ManualResetEventSlim _liveDone = new ManualResetEventSlim(true); | |
private volatile bool _stop; | |
private int _isPublishing; | |
private Position _lastProcessed; | |
private EventStoreAllCatchUpSubscription _subscription; | |
private volatile bool _isEventStoreConnected; | |
private readonly IPersistGetEventStorePosition _positionRepository; | |
public GetEventStoreEventDispatcher(IPEndPoint eventStoreTcpEndpoint, IEventBus eventBus, IPersistGetEventStorePosition positionRepository) | |
{ | |
if (eventBus == null) throw new ArgumentNullException("eventBus"); | |
if (positionRepository == null) throw new ArgumentNullException("positionRepository"); | |
var connSettings = ConnectionSettings.Create() | |
.OnConnected(HandleGetEventStoreConnected) | |
.OnDisconnected(HandleGetEventStoreDisconnected) | |
.OnErrorOccurred(HandleGetEventStoreConnectionError) | |
.KeepReconnecting() | |
.KeepRetrying(); | |
_connection = EventStoreConnection.Create(connSettings); | |
_eventStoreTcpEndpoint = eventStoreTcpEndpoint; | |
_eventBus = eventBus; | |
_positionRepository = positionRepository; | |
_lastProcessed = _positionRepository.GetLastProcessedPosition(); | |
} | |
public void StartDispatching() | |
{ | |
RecoverSubscription(); | |
} | |
public void StopDispatching() | |
{ | |
_stop = true; | |
_connection.Close(); | |
if (_subscription != null) | |
_subscription.Stop(new TimeSpan()); | |
if (!_liveDone.Wait(THREAD_KILL_TIMEOUT_MILLISEC)) | |
throw new TimeoutException("Unable to stop dispatching in time."); | |
} | |
private void HandleGetEventStoreConnectionError(EventStoreConnection connection, Exception ex) | |
{ | |
//TODO: log the error on a passed in ILogger | |
} | |
private void HandleGetEventStoreConnected(EventStoreConnection connection) | |
{ | |
_isEventStoreConnected = true; | |
_connection = connection; //TODO: not sure if I need to do this | |
if (_subscription == null) | |
RecoverSubscription(); | |
} | |
private void HandleGetEventStoreDisconnected(EventStoreConnection connection) | |
{ | |
_connection = connection; //TODO: not sure if I need to do this | |
_isEventStoreConnected = false; | |
} | |
private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, string reason, Exception error) | |
{ | |
RecoverSubscription(); | |
} | |
private void RecoverSubscription() | |
{ | |
if (!_isEventStoreConnected) | |
_connection.Connect(_eventStoreTcpEndpoint); | |
_subscription = _connection.SubscribeToAllFrom(_lastProcessed, false, HandleNewEvent, HandleSubscriptionDropped); | |
} | |
private void HandleNewEvent(EventStoreCatchUpSubscription subscription, ResolvedEvent @event) | |
{ | |
_liveQueue.Enqueue(@event); | |
EnsurePublishEvents(_liveQueue, _liveDone); | |
} | |
private void EnsurePublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent) | |
{ | |
if (_stop) return; | |
if (Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0) | |
ThreadPool.QueueUserWorkItem(_ => PublishEvents(queue, doneEvent)); | |
} | |
private void PublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent) | |
{ | |
bool keepGoing = true; | |
while (keepGoing) | |
{ | |
doneEvent.Reset(); // signal we start processing this queue | |
if (_stop) // this is to avoid race condition in StopDispatching, though it is 1AM here, so I could be wrong :) | |
{ | |
doneEvent.Set(); | |
Interlocked.CompareExchange(ref _isPublishing, 0, 1); | |
return; | |
} | |
ResolvedEvent evnt; | |
while (!_stop && queue.TryDequeue(out evnt)) | |
{ | |
if (evnt.OriginalPosition > _lastProcessed) // this ensures we don't process same events twice | |
{ | |
var processedEvent = ProcessRawEvent(evnt); | |
if (processedEvent != null) | |
_eventBus.Publish(processedEvent); | |
_lastProcessed = evnt.OriginalPosition.Value; | |
_positionRepository.PersistLastPositionProcessed(_lastProcessed); | |
} | |
} | |
doneEvent.Set(); // signal end of processing particular queue | |
Interlocked.CompareExchange(ref _isPublishing, 0, 1); | |
// try to reacquire lock if needed | |
keepGoing = !_stop && queue.Count > 0 && Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0; | |
} | |
} | |
private IEvent ProcessRawEvent(ResolvedEvent rawEvent) | |
{ | |
if (rawEvent.OriginalEvent.Metadata.Length > 0 && rawEvent.OriginalEvent.Data.Length > 0) | |
{ | |
var @event = DeserializeEvent(rawEvent.OriginalEvent.Metadata, rawEvent.OriginalEvent.Data); | |
return @event as IEvent; | |
} | |
return null; | |
} | |
/// <summary> | |
/// Deserializes the event from the raw GetEventStore event to my event. | |
/// Took this from a gist that James Nugent posted on the GetEventStore forumns. | |
/// </summary> | |
/// <param name="metadata"></param> | |
/// <param name="data"></param> | |
/// <returns></returns> | |
private static object DeserializeEvent(byte[] metadata, byte[] data) | |
{ | |
const string EventClrTypeHeader = "EventClrTypeName"; | |
var eventClrTypeName = JObject.Parse(Encoding.UTF8.GetString(metadata)).Property(EventClrTypeHeader).Value; | |
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), Type.GetType((string)eventClrTypeName)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Phil,
What changed from the old one https://gist.github.com/pdoh00/4744120?
Kind and regards.