Created
January 17, 2012 16:03
-
-
Save haf/1627224 to your computer and use it in GitHub Desktop.
Spike MassTransit EventStore Saga Repo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2007-2011 Henrik Feldt | |
// | |
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use | |
// this file except in compliance with the License. You may obtain a copy of the | |
// License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, software distributed | |
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR | |
// CONDITIONS OF ANY KIND, either express or implied. See the License for the | |
// specific language governing permissions and limitations under the License. | |
namespace MassTransit.EventStoreIntegration | |
{ | |
using System; | |
using System.Collections; | |
using System.Collections.Generic; | |
using System.Linq; | |
using EventStore; | |
using EventStore.Persistence; | |
using Exceptions; | |
using Pipeline; | |
using Saga; | |
using Util; | |
using log4net; | |
public interface IEventSourcedSaga : ISaga | |
{ | |
/// <summary> | |
/// Gets the saga version. | |
/// </summary> | |
ulong Version { get; } | |
/// <summary> | |
/// Apply an event that causes the saga to transition. | |
/// </summary> | |
/// <param name="message"></param> | |
void Apply(object message); | |
ICollection GetUncommittedEvents(); | |
void ClearUncommittedEvents(); | |
ICollection GetUndispatchedMessages(); | |
void ClearUndispatchedMessages(); | |
} | |
/// <summary> | |
/// joliver's Event Store backing of sagas! | |
/// </summary> | |
/// <typeparam name="TSaga">The type of saga.</typeparam> | |
public class EventStoreRepository<TSaga> : | |
ISagaRepository<TSaga> | |
where TSaga : class, ISaga, IEventSourcedSaga, new() | |
{ | |
static readonly ILog _log = LogManager.GetLogger(typeof (InMemorySagaRepository<TSaga>)); | |
readonly IStoreEvents _eventStore; | |
public EventStoreRepository([NotNull] IStoreEvents eventStore) | |
{ | |
if (eventStore == null) throw new ArgumentNullException("eventStore"); | |
_eventStore = eventStore; | |
} | |
IEnumerable<Action<IConsumeContext<TMessage>>> ISagaRepository<TSaga>.GetSaga<TMessage>( | |
IConsumeContext<TMessage> context, Guid sagaId, | |
InstanceHandlerSelector<TSaga, TMessage> selector, | |
ISagaPolicy<TSaga, TMessage> policy) | |
{ | |
TSaga instance; | |
IEventStream eventStream; | |
try | |
{ | |
eventStream = _eventStore.OpenStream(sagaId, 0, int.MaxValue); | |
instance = new TSaga(); | |
foreach (var @event in eventStream.CommittedEvents.Select(x => x.Body)) | |
instance.Apply(@event); | |
} | |
catch (StreamNotFoundException ex) | |
{ | |
_log.Info("could not find saga", ex); | |
instance = null; | |
eventStream = _eventStore.CreateStream(sagaId); | |
} | |
if (instance == null) | |
{ | |
if (policy.CanCreateInstance(context)) | |
{ | |
yield return x => | |
{ | |
if (_log.IsDebugEnabled) | |
_log.DebugFormat("SAGA: {0} Creating New {1} for {2}", typeof (TSaga).ToFriendlyName(), sagaId, | |
typeof (TMessage).ToFriendlyName()); | |
try | |
{ | |
instance = policy.CreateInstance(x, sagaId); | |
foreach (var callback in selector(instance, x)) | |
{ | |
callback(x); | |
} | |
if (!policy.CanRemoveInstance(instance)) | |
Save(instance, eventStream, new Guid(x.RequestId)); | |
} | |
catch (Exception ex) | |
{ | |
var sex = new SagaException("Create Saga Instance Exception", typeof (TSaga), typeof (TMessage), sagaId, ex); | |
if (_log.IsErrorEnabled) | |
_log.Error(sex); | |
throw sex; | |
} | |
}; | |
} | |
else | |
{ | |
if (_log.IsDebugEnabled) | |
_log.DebugFormat("SAGA: {0} Ignoring Missing {1} for {2}", typeof(TSaga).ToFriendlyName(), sagaId, | |
typeof(TMessage).ToFriendlyName()); | |
} | |
} | |
else | |
{ | |
if (policy.CanUseExistingInstance(context)) | |
{ | |
yield return x => | |
{ | |
if (_log.IsDebugEnabled) | |
_log.DebugFormat("SAGA: {0} Using Existing {1} for {2}", typeof(TSaga).ToFriendlyName(), sagaId, | |
typeof(TMessage).ToFriendlyName()); | |
try | |
{ | |
foreach (var callback in selector(instance, x)) | |
{ | |
callback(x); | |
} | |
if (policy.CanRemoveInstance(instance)) ; | |
// no need to work | |
} | |
catch (Exception ex) | |
{ | |
var sex = new SagaException("Existing Saga Instance Exception", typeof(TSaga), typeof(TMessage), sagaId, ex); | |
if (_log.IsErrorEnabled) | |
_log.Error(sex); | |
throw sex; | |
} | |
}; | |
} | |
else | |
{ | |
if (_log.IsDebugEnabled) | |
_log.DebugFormat("SAGA: {0} Ignoring Existing {1} for {2}", typeof(TSaga).ToFriendlyName(), sagaId, | |
typeof(TMessage).ToFriendlyName()); | |
} | |
} | |
} | |
void Save(TSaga instance, IEventStream sagaStream, Guid commitId) | |
{ | |
var stream = this.PrepareStream(instance, new Dictionary<string, object>(), sagaStream); | |
Persist(stream, commitId); | |
instance.ClearUncommittedEvents(); | |
instance.ClearUndispatchedMessages(); | |
} | |
private IEventStream PrepareStream(TSaga saga, Dictionary<string, object> headers, IEventStream stream) | |
{ | |
foreach (var item in headers) | |
stream.UncommittedHeaders[item.Key] = item.Value; | |
saga.GetUncommittedEvents() | |
.Cast<object>() | |
.Select(x => new EventMessage { Body = x }) | |
.ToList() | |
.ForEach(stream.Add); | |
return stream; | |
} | |
private static void Persist(IEventStream stream, Guid commitId) | |
{ | |
try | |
{ | |
stream.CommitChanges(commitId); | |
} | |
catch (DuplicateCommitException) | |
{ | |
stream.ClearChanges(); | |
} | |
catch (StorageException e) | |
{ | |
throw new SagaException(e.Message, typeof(TSaga), typeof(object)); | |
} | |
} | |
IEnumerable<Guid> ISagaRepository<TSaga>.Find(ISagaFilter<TSaga> filter) | |
{ | |
throw new NotImplementedException(); | |
} | |
IEnumerable<TSaga> ISagaRepository<TSaga>.Where(ISagaFilter<TSaga> filter) | |
{ | |
throw new NotImplementedException(); | |
} | |
IEnumerable<TResult> ISagaRepository<TSaga>.Where<TResult>(ISagaFilter<TSaga> filter, Func<TSaga, TResult> transformer) | |
{ | |
throw new NotImplementedException(); | |
} | |
IEnumerable<TResult> ISagaRepository<TSaga>.Select<TResult>(Func<TSaga, TResult> transformer) | |
{ | |
throw new NotImplementedException(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment