Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jen20/6092666 to your computer and use it in GitHub Desktop.
Save jen20/6092666 to your computer and use it in GitHub Desktop.
public class SlightlyBetterButStillNotGreatDispatcher
{
private readonly IEventStoreConnection _connection;
private readonly IPersistGetEventStorePosition _positionCheckpoint;
private readonly IPublisher _next;
private readonly Action<EventStoreCatchUpSubscription> _onLiveProcessingStarted;
private EventStoreAllCatchUpSubscription _subscription;
public SlightlyBetterButStillNotGreatDispatcher(IEventStoreConnection connection, IPersistGetEventStorePosition positionCheckpoint,
IPublisher publisher, Action<EventStoreCatchUpSubscription> onLiveProcessingStarted = null)
{
if (connection == null)
throw new ArgumentNullException("connection");
if (positionCheckpoint == null)
throw new ArgumentNullException("positionCheckpoint");
if (publisher == null)
throw new ArgumentNullException("publisher");
_connection = connection;
_positionCheckpoint = positionCheckpoint;
_next = publisher;
_onLiveProcessingStarted = onLiveProcessingStarted;
}
private void SetUpSubscription()
{
var fromPosition = _positionCheckpoint.GetLastProcessedPosition();
_subscription = _connection.SubscribeToAllFrom(fromPosition, false, EventAppeared, _onLiveProcessingStarted, Dropped);
}
private void Dropped(EventStoreCatchUpSubscription subscription, SubscriptionDropReason dropReason, Exception exception)
{
if (dropReason == SubscriptionDropReason.ProcessingQueueOverflow)
{
//TODO: Wait and reconnect probably with back off
}
if (dropReason == SubscriptionDropReason.UserInitiated)
return;
if (SubscriptionDropMayBeRecoverable(dropReason))
SetUpSubscription();
}
private bool SubscriptionDropMayBeRecoverable(SubscriptionDropReason dropReason)
{
return dropReason == SubscriptionDropReason.Unknown || dropReason == SubscriptionDropReason.SubscribingError ||
dropReason == SubscriptionDropReason.ServerError || dropReason == SubscriptionDropReason.ConnectionClosed;
}
private void EventAppeared(EventStoreCatchUpSubscription subscription, ResolvedEvent resolvedEvent)
{
IEvent domainEvent;
if (TryDeserializeAggregateEvent(resolvedEvent, out domainEvent))
{
try
{
_next.Publish(domainEvent);
}
catch {} // This isn't a great idea for most situations, but temporarily will do
}
if (resolvedEvent.OriginalPosition.HasValue)
_positionCheckpoint.PersistLastPositionProcessed(resolvedEvent.OriginalPosition.Value);
}
public void Start()
{
SetUpSubscription();
}
public void Stop(TimeSpan timeout)
{
_subscription.Stop(timeout);
}
private static bool TryDeserializeAggregateEvent(ResolvedEvent rawEvent, out IEvent deserializedEvent)
{
deserializedEvent = null;
if (rawEvent.OriginalEvent.EventType.StartsWith("$") || rawEvent.OriginalEvent.EventStreamId.StartsWith("$"))
return false;
IDictionary<string, JToken> metadata;
try
{
metadata = JObject.Parse(Encoding.UTF8.GetString(rawEvent.OriginalEvent.Metadata));
}
catch (JsonReaderException)
{
return false;
}
if (!metadata.ContainsKey("EventClrTypeName"))
return false;
Type deserializeTo;
try
{
deserializeTo = Type.GetType((string)metadata["EventClrTypeName"], true);
}
catch (Exception) //TODO be more specific here
{
return false;
}
try
{
var jsonString = Encoding.UTF8.GetString(rawEvent.OriginalEvent.Data);
deserializedEvent = JsonConvert.DeserializeObject(jsonString, deserializeTo) as IEvent;
return deserializedEvent != null;
}
catch (JsonReaderException)
{
return false;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment