Created
March 27, 2014 11:16
-
-
Save jnthn/9805301 to your computer and use it in GitHub Desktop.
Edument.CQRS EventStore binding
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Linq; | |
using System.Net; | |
using System.Text; | |
using System.Xml.Serialization; | |
using EventStore.ClientAPI; | |
namespace Edument.CQRS | |
{ | |
public class ESEventStore : IEventStore | |
{ | |
private IEventStoreConnection conn = EventStoreConnection | |
.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113)); | |
public ESEventStore() | |
{ | |
conn.Connect(); | |
} | |
public IEnumerable LoadEventsFor<TAggregate>(Guid id) | |
{ | |
StreamEventsSlice currentSlice; | |
var nextSliceStart = StreamPosition.Start; | |
do | |
{ | |
currentSlice = conn.ReadStreamEventsForward(id.ToString(), nextSliceStart, 200, false); | |
foreach (var e in currentSlice.Events) | |
yield return Deserialize(e.Event.EventType, e.Event.Data); | |
nextSliceStart = currentSlice.NextEventNumber; | |
} while (!currentSlice.IsEndOfStream); | |
} | |
private object Deserialize(string typeName, byte[] data) | |
{ | |
var ser = new XmlSerializer(Type.GetType(typeName)); | |
var ms = new MemoryStream(data); | |
ms.Seek(0, SeekOrigin.Begin); | |
return ser.Deserialize(ms); | |
} | |
public void SaveEventsFor<TAggregate>(Guid? id, int eventsLoaded, ArrayList newEvents) | |
{ | |
// Establish the aggregate ID to save the events under and ensure they | |
// all have the correct ID. | |
if (newEvents.Count == 0) | |
return; | |
Guid aggregateId = id ?? GetAggregateIdFromEvent(newEvents[0]); | |
foreach (var e in newEvents) | |
if (GetAggregateIdFromEvent(e) != aggregateId) | |
throw new InvalidOperationException( | |
"Cannot save events reporting inconsistent aggregate IDs"); | |
var expected = eventsLoaded == 0 ? ExpectedVersion.NoStream : eventsLoaded - 1; | |
conn.AppendToStream(aggregateId.ToString(), expected, newEvents | |
.Cast<dynamic>() | |
.Select(e => new EventData(e.Id, e.GetType().AssemblyQualifiedName, | |
false, Serialize(e), null))); | |
} | |
private Guid GetAggregateIdFromEvent(object e) | |
{ | |
var idField = e.GetType().GetField("Id"); | |
if (idField == null) | |
throw new Exception("Event type " + e.GetType().Name + " is missing an Id field"); | |
return (Guid)idField.GetValue(e); | |
} | |
private byte[] Serialize(object obj) | |
{ | |
var ser = new XmlSerializer(obj.GetType()); | |
var ms = new MemoryStream(); | |
ser.Serialize(ms, obj); | |
ms.Seek(0, SeekOrigin.Begin); | |
return ms.ToArray(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment