Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save hossambarakat/d3f435f822f69bc54e23cb28510d7ea6 to your computer and use it in GitHub Desktop.
Save hossambarakat/d3f435f822f69bc54e23cb28510d7ea6 to your computer and use it in GitHub Desktop.
public class GetEventStoreRepository : IRepository
{
private const string EventClrTypeHeader = "EventClrTypeName";
private const string AggregateClrTypeHeader = "AggregateClrTypeName";
private const string CommitIdHeader = "CommitId";
private const int WritePageSize = 500;
private const int ReadPageSize = 500;
private readonly Func<Type, Guid, string> _aggregateIdToStreamName;
private readonly EventStoreConnection _eventStoreConnection;
private readonly IPEndPoint _tcpEndpoint;
public GetEventStoreRepository(EventStoreConnection eventStoreConnection, IPEndPoint eventStoreTcpEndpoint)
: this(eventStoreConnection, eventStoreTcpEndpoint, (t, g) => string.Format("{0}-{1}", char.ToLower(t.Name[0]) + t.Name.Substring(1), g))
{
}
public GetEventStoreRepository(EventStoreConnection eventStoreConnection, IPEndPoint eventStoreTcpEndpoint, Func<Type, Guid, string> aggregateIdToStreamName)
{
_eventStoreConnection = eventStoreConnection;
_tcpEndpoint = eventStoreTcpEndpoint;
_aggregateIdToStreamName = aggregateIdToStreamName;
}
public TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IAggregate
{
EnsureConnected();
var streamName = _aggregateIdToStreamName(typeof(TAggregate), id);
var aggregate = ConstructAggregate<TAggregate>();
StreamEventsSlice currentSlice;
var nextSliceStart = 1;
do
{
currentSlice = _eventStoreConnection.ReadStreamEventsForward(streamName, nextSliceStart, ReadPageSize, false);
nextSliceStart = currentSlice.NextEventNumber;
foreach (var evnt in currentSlice.Events)
aggregate.ApplyEvent(DeserializeEvent(evnt.OriginalEvent.Metadata, evnt.OriginalEvent.Data));
} while (!currentSlice.IsEndOfStream);
return aggregate;
}
public TAggregate GetById<TAggregate>(Guid id, int version) where TAggregate : class, IAggregate
{
EnsureConnected();
var streamName = _aggregateIdToStreamName(typeof(TAggregate), id);
var aggregate = ConstructAggregate<TAggregate>();
var sliceStart = 1; //Ignores $StreamCreated
StreamEventsSlice currentSlice;
do
{
var sliceCount = sliceStart + ReadPageSize <= version
? ReadPageSize
: version - sliceStart + 1;
currentSlice = _eventStoreConnection.ReadStreamEventsForward(streamName, sliceStart, sliceCount, false);
sliceStart = currentSlice.NextEventNumber;
foreach (var evnt in currentSlice.Events)
aggregate.ApplyEvent(DeserializeEvent(evnt.OriginalEvent.Metadata, evnt.OriginalEvent.Data));
} while (version > currentSlice.NextEventNumber && !currentSlice.IsEndOfStream);
return aggregate;
}
public object DeserializeEvent(byte[] metadata, byte[] data)
{
var eventClrTypeName = JObject.Parse(Encoding.UTF8.GetString(metadata)).Property(EventClrTypeHeader).Value;
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), Type.GetType((string)eventClrTypeName));
}
public void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders)
{
EnsureConnected();
var commitHeaders = new Dictionary<string, object>
{
{CommitIdHeader, commitId},
{AggregateClrTypeHeader, aggregate.GetType().AssemblyQualifiedName}
};
updateHeaders(commitHeaders);
var streamName = _aggregateIdToStreamName(aggregate.GetType(), aggregate.Id);
var newEvents = aggregate.GetUncommittedEvents().Cast<object>().ToList();
var originalVersion = aggregate.Version - newEvents.Count;
var expectedVersion = originalVersion == 0 ? -1 : originalVersion;
var preparedEvents = PrepareEvents(newEvents, commitHeaders).ToList();
if (preparedEvents.Count < WritePageSize)
{
_eventStoreConnection.AppendToStream(streamName, expectedVersion, preparedEvents);
}
else
{
var transaction = _eventStoreConnection.StartTransaction(streamName, expectedVersion);
var position = 0;
while (position < preparedEvents.Count)
{
var pageEvents = preparedEvents.Skip(position).Take(WritePageSize);
transaction.Write(pageEvents);
position += WritePageSize;
}
transaction.Commit();
}
aggregate.ClearUncommittedEvents();
}
private static IEnumerable<EventData> PrepareEvents(IEnumerable<object> events, IDictionary<string, object> commitHeaders)
{
return events.Select(e => JsonEventData.Create(Guid.NewGuid(), e, commitHeaders));
}
private static TAggregate ConstructAggregate<TAggregate>()
{
return (TAggregate)Activator.CreateInstance(typeof(TAggregate), true);
}
private bool _isConnected;
private void EnsureConnected()
{
if (_isConnected)
return;
_eventStoreConnection.Connect(_tcpEndpoint);
_isConnected = true;
}
private static class JsonEventData
{
public static EventData Create(Guid eventId, object evnt, IDictionary<string, object> headers)
{
var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evnt, SerializerSettings));
var metadata = AddEventClrTypeHeaderAndSerializeMetadata(evnt, headers);
var typeName = evnt.GetType().Name;
return new EventData(eventId, typeName, true, data, metadata);
}
private static readonly JsonSerializerSettings SerializerSettings;
private static byte[] AddEventClrTypeHeaderAndSerializeMetadata(object evnt, IDictionary<string, object> headers)
{
var eventHeaders = new Dictionary<string, object>(headers)
{
{EventClrTypeHeader, evnt.GetType().AssemblyQualifiedName}
};
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, SerializerSettings));
}
static JsonEventData()
{
SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None };
}
}
}
/// <summary>
/// Integration tests for the GetEventStoreRepository. These tests require a
/// running version of the Event Store, with a TCP endpoint as specified in the
/// IntegrationTestTcpEndPoint field (defaults to local loopback, port 1113).
/// </summary>
[TestFixture]
public class GetEventStoreRepositoryIntegrationTests
{
/// <summary>
/// Set this to the TCP endpoint on which the Event Store is running.
/// </summary>
private static readonly IPEndPoint IntegrationTestTcpEndPoint = new IPEndPoint(IPAddress.Loopback, 1113);
private static Guid SaveTestAggregateWithoutCustomHeaders(IRepository repository, int numberOfEvents)
{
var aggregateToSave = new TestAggregate(Guid.NewGuid());
aggregateToSave.ProduceEvents(numberOfEvents);
repository.Save(aggregateToSave, Guid.NewGuid(), d => { });
return aggregateToSave.Id;
}
[Test]
public void ClearsEventsFromAggregateOnceCommitted()
{
var connection = EventStoreConnection.Create();
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint);
var aggregateToSave = new TestAggregate(Guid.NewGuid());
aggregateToSave.ProduceEvents(10);
repo.Save(aggregateToSave, Guid.NewGuid(), d => { });
Assert.AreEqual(0, ((IAggregate)aggregateToSave).GetUncommittedEvents().Count);
}
[Test]
public void CanGetLatestVersionById()
{
var connection = EventStoreConnection.Create();
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint);
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 3000 /* excludes TestAggregateCreated */);
var retrieved = repo.GetById<TestAggregate>(savedId);
Assert.AreEqual(3000, retrieved.AppliedEventCount);
connection.Close();
}
[Test]
public void CanGetSpecificVersionFromFirstPageById()
{
var connection = EventStoreConnection.Create();
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint);
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 100 /* excludes TestAggregateCreated */);
var retrieved = repo.GetById<TestAggregate>(savedId, 65);
Assert.AreEqual(64, retrieved.AppliedEventCount);
connection.Close();
}
[Test]
public void CanGetSpecificVersionFromSubsequentPageById()
{
var connection = EventStoreConnection.Create();
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint);
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 500 /* excludes TestAggregateCreated */);
var retrieved = repo.GetById<TestAggregate>(savedId, 126);
Assert.AreEqual(125, retrieved.AppliedEventCount);
connection.Close();
}
[Test]
public void CanSaveExistingAggregate()
{
var connection = EventStoreConnection.Create();
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint);
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 100 /* excludes TestAggregateCreated */);
var firstSaved = repo.GetById<TestAggregate>(savedId);
firstSaved.ProduceEvents(50);
repo.Save(firstSaved, Guid.NewGuid(), d => { });
var secondSaved = repo.GetById<TestAggregate>(savedId);
Assert.AreEqual(150, secondSaved.AppliedEventCount);
connection.Close();
}
[Test]
public void CanSaveMultiplesOfWritePageSize()
{
var connection = EventStoreConnection.Create();
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint);
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 1500 /* excludes TestAggregateCreated */);
var saved = repo.GetById<TestAggregate>(savedId);
Assert.AreEqual(1500, saved.AppliedEventCount);
connection.Close();
}
[Test]
public void GetsEventsFromCorrectStreams()
{
var connection = EventStoreConnection.Create();
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint);
var aggregate1Id = SaveTestAggregateWithoutCustomHeaders(repo, 100);
var aggregate2Id = SaveTestAggregateWithoutCustomHeaders(repo, 50);
var firstSaved = repo.GetById<TestAggregate>(aggregate1Id);
Assert.AreEqual(100, firstSaved.AppliedEventCount);
var secondSaved = repo.GetById<TestAggregate>(aggregate2Id);
Assert.AreEqual(50, secondSaved.AppliedEventCount);
connection.Close();
}
[Test]
public void CanHandleLargeNumberOfEventsInOneTransaction()
{
const int numberOfEvents = 50000;
var connection = EventStoreConnection.Create();
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint);
var aggregateId = SaveTestAggregateWithoutCustomHeaders(repo, numberOfEvents);
var saved = repo.GetById<TestAggregate>(aggregateId);
Assert.AreEqual(numberOfEvents, saved.AppliedEventCount);
connection.Close();
}
[Test]
public void SavesCommitHeadersOnEachEvent()
{
var connection = EventStoreConnection.Create();
var repo = new GetEventStoreRepository(connection, IntegrationTestTcpEndPoint);
var commitId = Guid.NewGuid();
var aggregateToSave = new TestAggregate(Guid.NewGuid());
aggregateToSave.ProduceEvents(20);
repo.Save(aggregateToSave, commitId, d =>
{
d.Add("CustomHeader1", "CustomValue1");
d.Add("CustomHeader2", "CustomValue2");
});
var read = connection.ReadStreamEventsForward(string.Format("aggregate-{0}", aggregateToSave.Id), 1, 20, false);
foreach (var serializedEvent in read.Events)
{
var parsedMetadata = JObject.Parse(Encoding.UTF8.GetString(serializedEvent.OriginalEvent.Metadata));
var deserializedCommitId = parsedMetadata.Property("CommitId").Value.ToObject<Guid>();
Assert.AreEqual(commitId, deserializedCommitId);
var deserializedCustomHeader1 = parsedMetadata.Property("CustomHeader1").Value.ToObject<string>();
Assert.AreEqual("CustomValue1", deserializedCustomHeader1);
var deserializedCustomHeader2 = parsedMetadata.Property("CustomHeader2").Value.ToObject<string>();
Assert.AreEqual("CustomValue2", deserializedCustomHeader2);
}
connection.Close();
}
}
public class TestAggregate : AggregateBase
{
public int AppliedEventCount { get; private set; }
public TestAggregate(Guid aggregateId)
: this()
{
RaiseEvent(new TestAggregateCreated(aggregateId));
}
public void ProduceEvents(int count)
{
for (var i = 0; i < count; i++)
RaiseEvent(new WoftamEvent("Woftam1-" + i, "Woftam2-" + i));
}
private TestAggregate()
{
Register<TestAggregateCreated>(e => Id = e.AggregateId);
Register<WoftamEvent>(e => AppliedEventCount++);
}
}
public class TestAggregateCreated
{
public TestAggregateCreated(Guid aggregateId)
{
AggregateId = aggregateId;
}
public Guid AggregateId { get; private set; }
}
public class WoftamEvent
{
public WoftamEvent(string property1, string property2)
{
Property1 = property1;
Property2 = property2;
}
public string Property1 { get; private set; }
public string Property2 { get; private set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment