-
-
Save dimajanzen/4635660 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class GetEventStoreRepository : IRepository | |
{ | |
private const string EventClrTypeHeader = "EventClrTypeName"; | |
private const string AggregateClrTypeHeader = "AggregateClrTypeName"; | |
private const string CommitIdHeader = "CommitId"; | |
private const int WritePageSize = 500; | |
private const int ReadPageSize = 500; | |
private readonly Func<Type, Guid, string> _aggregateIdToStreamName; | |
private readonly EventStoreConnection _eventStoreConnection; | |
private readonly IPEndPoint _tcpEndpoint; | |
public GetEventStoreRepository(EventStoreConnection eventStoreConnection, IPEndPoint eventStoreTcpEndpoint) | |
: this(eventStoreConnection, eventStoreTcpEndpoint, (t, g) => string.Format("{0}-{1}", char.ToLower(t.Name[0]) + t.Name.Substring(1), g)) | |
{ | |
} | |
public GetEventStoreRepository(EventStoreConnection eventStoreConnection, IPEndPoint eventStoreTcpEndpoint, Func<Type, Guid, string> aggregateIdToStreamName) | |
{ | |
_eventStoreConnection = eventStoreConnection; | |
_tcpEndpoint = eventStoreTcpEndpoint; | |
_aggregateIdToStreamName = aggregateIdToStreamName; | |
} | |
public TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IAggregate | |
{ | |
EnsureConnected(); | |
var streamName = _aggregateIdToStreamName(typeof(TAggregate), id); | |
var aggregate = ConstructAggregate<TAggregate>(); | |
StreamEventsSlice currentSlice; | |
var nextSliceStart = 1; | |
do | |
{ | |
currentSlice = _eventStoreConnection.ReadStreamEventsForward(streamName, nextSliceStart, ReadPageSize, false); | |
nextSliceStart = currentSlice.NextEventNumber; | |
foreach (var evnt in currentSlice.Events) | |
aggregate.ApplyEvent(DeserializeEvent(evnt.OriginalEvent.Metadata, evnt.OriginalEvent.Data)); | |
} while (!currentSlice.IsEndOfStream); | |
return aggregate; | |
} | |
public TAggregate GetById<TAggregate>(Guid id, int version) where TAggregate : class, IAggregate | |
{ | |
EnsureConnected(); | |
var streamName = _aggregateIdToStreamName(typeof(TAggregate), id); | |
var aggregate = ConstructAggregate<TAggregate>(); | |
var sliceStart = 1; //Ignores $StreamCreated | |
StreamEventsSlice currentSlice; | |
do | |
{ | |
var sliceCount = sliceStart + ReadPageSize <= version | |
? ReadPageSize | |
: version - sliceStart + 1; | |
currentSlice = _eventStoreConnection.ReadStreamEventsForward(streamName, sliceStart, sliceCount, false); | |
sliceStart = currentSlice.NextEventNumber; | |
foreach (var evnt in currentSlice.Events) | |
aggregate.ApplyEvent(DeserializeEvent(evnt.OriginalEvent.Metadata, evnt.OriginalEvent.Data)); | |
} while (version > currentSlice.NextEventNumber && !currentSlice.IsEndOfStream); | |
return aggregate; | |
} | |
public object DeserializeEvent(byte[] metadata, byte[] data) | |
{ | |
var eventClrTypeName = JObject.Parse(Encoding.UTF8.GetString(metadata)).Property(EventClrTypeHeader).Value; | |
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), Type.GetType((string)eventClrTypeName)); | |
} | |
public void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders) | |
{ | |
EnsureConnected(); | |
var commitHeaders = new Dictionary<string, object> | |
{ | |
{CommitIdHeader, commitId}, | |
{AggregateClrTypeHeader, aggregate.GetType().AssemblyQualifiedName} | |
}; | |
updateHeaders(commitHeaders); | |
var streamName = _aggregateIdToStreamName(aggregate.GetType(), aggregate.Id); | |
var newEvents = aggregate.GetUncommittedEvents().Cast<object>().ToList(); | |
var originalVersion = aggregate.Version - newEvents.Count; | |
var expectedVersion = originalVersion == 0 ? -1 : originalVersion; | |
var preparedEvents = PrepareEvents(newEvents, commitHeaders).ToList(); | |
if (preparedEvents.Count < WritePageSize) | |
{ | |
_eventStoreConnection.AppendToStream(streamName, expectedVersion, preparedEvents); | |
} | |
else | |
{ | |
var transaction = _eventStoreConnection.StartTransaction(streamName, expectedVersion); | |
var position = 0; | |
while (position < preparedEvents.Count) | |
{ | |
var pageEvents = preparedEvents.Skip(position).Take(WritePageSize); | |
transaction.Write(pageEvents); | |
position += WritePageSize; | |
} | |
transaction.Commit(); | |
} | |
aggregate.ClearUncommittedEvents(); | |
} | |
private static IEnumerable<EventData> PrepareEvents(IEnumerable<object> events, IDictionary<string, object> commitHeaders) | |
{ | |
return events.Select(e => JsonEventData.Create(Guid.NewGuid(), e, commitHeaders)); | |
} | |
private static TAggregate ConstructAggregate<TAggregate>() | |
{ | |
return (TAggregate)Activator.CreateInstance(typeof(TAggregate), true); | |
} | |
private bool _isConnected; | |
private void EnsureConnected() | |
{ | |
if (_isConnected) | |
return; | |
_eventStoreConnection.Connect(_tcpEndpoint); | |
_isConnected = true; | |
} | |
private static class JsonEventData | |
{ | |
public static EventData Create(Guid eventId, object evnt, IDictionary<string, object> headers) | |
{ | |
var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evnt, SerializerSettings)); | |
var metadata = AddEventClrTypeHeaderAndSerializeMetadata(evnt, headers); | |
var typeName = evnt.GetType().Name; | |
return new EventData(eventId, typeName, true, data, metadata); | |
} | |
private static readonly JsonSerializerSettings SerializerSettings; | |
private static byte[] AddEventClrTypeHeaderAndSerializeMetadata(object evnt, IDictionary<string, object> headers) | |
{ | |
var eventHeaders = new Dictionary<string, object>(headers) | |
{ | |
{EventClrTypeHeader, evnt.GetType().AssemblyQualifiedName} | |
}; | |
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, SerializerSettings)); | |
} | |
static JsonEventData() | |
{ | |
SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None }; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Integration tests for the GetEventStoreRepository. These tests require a | |
/// running version of the Event Store, with a TCP endpoint as specified in the | |
/// IntegrationTestTcpEndPoint field (defaults to local loopback, port 1113). | |
/// </summary> | |
[TestFixture] | |
public class GetEventStoreRepositoryIntegrationTests | |
{ | |
/// <summary> | |
/// Set this to the TCP endpoint on which the Event Store is running. | |
/// </summary> | |
private static readonly IPEndPoint IntegrationTestTcpEndPoint = new IPEndPoint(IPAddress.Loopback, 1113); | |
private static Guid SaveTestAggregateWithoutCustomHeaders(IRepository repository, int numberOfEvents) | |
{ | |
var aggregateToSave = new TestAggregate(Guid.NewGuid()); | |
aggregateToSave.ProduceEvents(numberOfEvents); | |
repository.Save(aggregateToSave, Guid.NewGuid(), d => { }); | |
return aggregateToSave.Id; | |
} | |
[Test] | |
public void ClearsEventsFromAggregateOnceCommitted() | |
{ | |
var connection = EventStoreConnection.Create(); | |
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint); | |
var aggregateToSave = new TestAggregate(Guid.NewGuid()); | |
aggregateToSave.ProduceEvents(10); | |
repo.Save(aggregateToSave, Guid.NewGuid(), d => { }); | |
Assert.AreEqual(0, ((IAggregate)aggregateToSave).GetUncommittedEvents().Count); | |
} | |
[Test] | |
public void CanGetLatestVersionById() | |
{ | |
var connection = EventStoreConnection.Create(); | |
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint); | |
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 3000 /* excludes TestAggregateCreated */); | |
var retrieved = repo.GetById<TestAggregate>(savedId); | |
Assert.AreEqual(3000, retrieved.AppliedEventCount); | |
connection.Close(); | |
} | |
[Test] | |
public void CanGetSpecificVersionFromFirstPageById() | |
{ | |
var connection = EventStoreConnection.Create(); | |
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint); | |
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 100 /* excludes TestAggregateCreated */); | |
var retrieved = repo.GetById<TestAggregate>(savedId, 65); | |
Assert.AreEqual(64, retrieved.AppliedEventCount); | |
connection.Close(); | |
} | |
[Test] | |
public void CanGetSpecificVersionFromSubsequentPageById() | |
{ | |
var connection = EventStoreConnection.Create(); | |
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint); | |
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 500 /* excludes TestAggregateCreated */); | |
var retrieved = repo.GetById<TestAggregate>(savedId, 126); | |
Assert.AreEqual(125, retrieved.AppliedEventCount); | |
connection.Close(); | |
} | |
[Test] | |
public void CanSaveExistingAggregate() | |
{ | |
var connection = EventStoreConnection.Create(); | |
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint); | |
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 100 /* excludes TestAggregateCreated */); | |
var firstSaved = repo.GetById<TestAggregate>(savedId); | |
firstSaved.ProduceEvents(50); | |
repo.Save(firstSaved, Guid.NewGuid(), d => { }); | |
var secondSaved = repo.GetById<TestAggregate>(savedId); | |
Assert.AreEqual(150, secondSaved.AppliedEventCount); | |
connection.Close(); | |
} | |
[Test] | |
public void CanSaveMultiplesOfWritePageSize() | |
{ | |
var connection = EventStoreConnection.Create(); | |
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint); | |
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 1500 /* excludes TestAggregateCreated */); | |
var saved = repo.GetById<TestAggregate>(savedId); | |
Assert.AreEqual(1500, saved.AppliedEventCount); | |
connection.Close(); | |
} | |
[Test] | |
public void GetsEventsFromCorrectStreams() | |
{ | |
var connection = EventStoreConnection.Create(); | |
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint); | |
var aggregate1Id = SaveTestAggregateWithoutCustomHeaders(repo, 100); | |
var aggregate2Id = SaveTestAggregateWithoutCustomHeaders(repo, 50); | |
var firstSaved = repo.GetById<TestAggregate>(aggregate1Id); | |
Assert.AreEqual(100, firstSaved.AppliedEventCount); | |
var secondSaved = repo.GetById<TestAggregate>(aggregate2Id); | |
Assert.AreEqual(50, secondSaved.AppliedEventCount); | |
connection.Close(); | |
} | |
[Test] | |
public void CanHandleLargeNumberOfEventsInOneTransaction() | |
{ | |
const int numberOfEvents = 50000; | |
var connection = EventStoreConnection.Create(); | |
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint); | |
var aggregateId = SaveTestAggregateWithoutCustomHeaders(repo, numberOfEvents); | |
var saved = repo.GetById<TestAggregate>(aggregateId); | |
Assert.AreEqual(numberOfEvents, saved.AppliedEventCount); | |
connection.Close(); | |
} | |
[Test] | |
public void SavesCommitHeadersOnEachEvent() | |
{ | |
var connection = EventStoreConnection.Create(); | |
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint); | |
var commitId = Guid.NewGuid(); | |
var aggregateToSave = new TestAggregate(Guid.NewGuid()); | |
aggregateToSave.ProduceEvents(20); | |
repo.Save(aggregateToSave, commitId, d => | |
{ | |
d.Add("CustomHeader1", "CustomValue1"); | |
d.Add("CustomHeader2", "CustomValue2"); | |
}); | |
var read = connection.ReadStreamEventsForward(string.Format("aggregate-{0}", aggregateToSave.Id), 1, 20, false); | |
foreach (var serializedEvent in read.Events) | |
{ | |
var parsedMetadata = JObject.Parse(Encoding.UTF8.GetString(serializedEvent.OriginalEvent.Metadata)); | |
var deserializedCommitId = parsedMetadata.Property("CommitId").Value.ToObject<Guid>(); | |
Assert.AreEqual(commitId, deserializedCommitId); | |
var deserializedCustomHeader1 = parsedMetadata.Property("CustomHeader1").Value.ToObject<string>(); | |
Assert.AreEqual("CustomValue1", deserializedCustomHeader1); | |
var deserializedCustomHeader2 = parsedMetadata.Property("CustomHeader2").Value.ToObject<string>(); | |
Assert.AreEqual("CustomValue2", deserializedCustomHeader2); | |
} | |
connection.Close(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TestAggregate : AggregateBase | |
{ | |
public int AppliedEventCount { get; private set; } | |
public TestAggregate(Guid aggregateId) | |
: this() | |
{ | |
RaiseEvent(new TestAggregateCreated(aggregateId)); | |
} | |
public void ProduceEvents(int count) | |
{ | |
for (var i = 0; i < count; i++) | |
RaiseEvent(new WoftamEvent("Woftam1-" + i, "Woftam2-" + i)); | |
} | |
private TestAggregate() | |
{ | |
Register<TestAggregateCreated>(e => Id = e.AggregateId); | |
Register<WoftamEvent>(e => AppliedEventCount++); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TestAggregateCreated | |
{ | |
public TestAggregateCreated(Guid aggregateId) | |
{ | |
AggregateId = aggregateId; | |
} | |
public Guid AggregateId { get; private set; } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class WoftamEvent | |
{ | |
public WoftamEvent(string property1, string property2) | |
{ | |
Property1 = property1; | |
Property2 = property2; | |
} | |
public string Property1 { get; private set; } | |
public string Property2 { get; private set; } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment