Skip to content

Instantly share code, notes, and snippets.

@philcleveland
Last active December 16, 2015 05:19
Show Gist options
  • Save philcleveland/5383804 to your computer and use it in GitHub Desktop.
Save philcleveland/5383804 to your computer and use it in GitHub Desktop.
GetEventStore Event Dispatcher EventStore.ClientAPI compatable v1.1.0
//Special thanks to Andrii Nakryiko and James Nugent
//for their help with this code.
namespace Infrastructure.EventStorage
{
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Text;
using System.Threading;
using EventStore.ClientAPI;
using Infrastructure.Messaging.Events;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
/// <summary>
/// Event Dispatcher for GetEventStore's
/// Compatable with EventStore.ClientAPI v1.1.0
/// </summary>
public class GetEventStoreEventDispatcher
{
private const int THREAD_KILL_TIMEOUT_MILLISEC = 5000;
private readonly IEventBus _eventBus;
private EventStoreConnection _connection;
private readonly IPEndPoint _eventStoreTcpEndpoint;
private readonly ConcurrentQueue<ResolvedEvent> _liveQueue = new ConcurrentQueue<ResolvedEvent>();
private readonly ManualResetEventSlim _liveDone = new ManualResetEventSlim(true);
private volatile bool _stop;
private int _isPublishing;
private Position _lastProcessed;
private EventStoreAllCatchUpSubscription _subscription;
private volatile bool _isEventStoreConnected;
private readonly IPersistGetEventStorePosition _positionRepository;
public GetEventStoreEventDispatcher(IPEndPoint eventStoreTcpEndpoint, IEventBus eventBus, IPersistGetEventStorePosition positionRepository)
{
if (eventBus == null) throw new ArgumentNullException("eventBus");
if (positionRepository == null) throw new ArgumentNullException("positionRepository");
var connSettings = ConnectionSettings.Create()
.OnConnected(HandleGetEventStoreConnected)
.OnDisconnected(HandleGetEventStoreDisconnected)
.OnErrorOccurred(HandleGetEventStoreConnectionError)
.KeepReconnecting()
.KeepRetrying();
_connection = EventStoreConnection.Create(connSettings);
_eventStoreTcpEndpoint = eventStoreTcpEndpoint;
_eventBus = eventBus;
_positionRepository = positionRepository;
_lastProcessed = _positionRepository.GetLastProcessedPosition();
}
public void StartDispatching()
{
RecoverSubscription();
}
public void StopDispatching()
{
_stop = true;
_connection.Close();
if (_subscription != null)
_subscription.Stop(new TimeSpan());
if (!_liveDone.Wait(THREAD_KILL_TIMEOUT_MILLISEC))
throw new TimeoutException("Unable to stop dispatching in time.");
}
private void HandleGetEventStoreConnectionError(EventStoreConnection connection, Exception ex)
{
//TODO: log the error on a passed in ILogger
}
private void HandleGetEventStoreConnected(EventStoreConnection connection)
{
_isEventStoreConnected = true;
_connection = connection; //TODO: not sure if I need to do this
if (_subscription == null)
RecoverSubscription();
}
private void HandleGetEventStoreDisconnected(EventStoreConnection connection)
{
_connection = connection; //TODO: not sure if I need to do this
_isEventStoreConnected = false;
}
private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, string reason, Exception error)
{
RecoverSubscription();
}
private void RecoverSubscription()
{
if (!_isEventStoreConnected)
_connection.Connect(_eventStoreTcpEndpoint);
_subscription = _connection.SubscribeToAllFrom(_lastProcessed, false, HandleNewEvent, HandleSubscriptionDropped);
}
private void HandleNewEvent(EventStoreCatchUpSubscription subscription, ResolvedEvent @event)
{
_liveQueue.Enqueue(@event);
EnsurePublishEvents(_liveQueue, _liveDone);
}
private void EnsurePublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent)
{
if (_stop) return;
if (Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0)
ThreadPool.QueueUserWorkItem(_ => PublishEvents(queue, doneEvent));
}
private void PublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent)
{
bool keepGoing = true;
while (keepGoing)
{
doneEvent.Reset(); // signal we start processing this queue
if (_stop) // this is to avoid race condition in StopDispatching, though it is 1AM here, so I could be wrong :)
{
doneEvent.Set();
Interlocked.CompareExchange(ref _isPublishing, 0, 1);
return;
}
ResolvedEvent evnt;
while (!_stop && queue.TryDequeue(out evnt))
{
if (evnt.OriginalPosition > _lastProcessed) // this ensures we don't process same events twice
{
var processedEvent = ProcessRawEvent(evnt);
if (processedEvent != null)
_eventBus.Publish(processedEvent);
_lastProcessed = evnt.OriginalPosition.Value;
_positionRepository.PersistLastPositionProcessed(_lastProcessed);
}
}
doneEvent.Set(); // signal end of processing particular queue
Interlocked.CompareExchange(ref _isPublishing, 0, 1);
// try to reacquire lock if needed
keepGoing = !_stop && queue.Count > 0 && Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0;
}
}
private IEvent ProcessRawEvent(ResolvedEvent rawEvent)
{
if (rawEvent.OriginalEvent.Metadata.Length > 0 && rawEvent.OriginalEvent.Data.Length > 0)
{
var @event = DeserializeEvent(rawEvent.OriginalEvent.Metadata, rawEvent.OriginalEvent.Data);
return @event as IEvent;
}
return null;
}
/// <summary>
/// Deserializes the event from the raw GetEventStore event to my event.
/// Took this from a gist that James Nugent posted on the GetEventStore forumns.
/// </summary>
/// <param name="metadata"></param>
/// <param name="data"></param>
/// <returns></returns>
private static object DeserializeEvent(byte[] metadata, byte[] data)
{
const string EventClrTypeHeader = "EventClrTypeName";
var eventClrTypeName = JObject.Parse(Encoding.UTF8.GetString(metadata)).Property(EventClrTypeHeader).Value;
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), Type.GetType((string)eventClrTypeName));
}
}
}
@gblmarquez
Copy link

Phil,

What changed from the old one https://gist.github.com/pdoh00/4744120?

Kind and regards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment