Created
February 11, 2018 18:17
-
-
Save leidegre/2ba149694e00491f48613c48cabb1163 to your computer and use it in GitHub Desktop.
An event sourcing example minus the cruft to illustrate basic principles of an event store
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Newtonsoft.Json; | |
using Newtonsoft.Json.Linq; | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
namespace EventSourcingExample | |
{ | |
struct Event | |
{ | |
public int Id { get; } | |
public string StreamId { get; } | |
public int SequenceNumber { get; } | |
public JToken Payload { get; } | |
public DateTimeOffset Created { get; } | |
public Event(int id, string streamId, int sequenceNumber, JToken payload, DateTimeOffset? created = null) | |
{ | |
this.Id = id; | |
this.StreamId = streamId; | |
this.SequenceNumber = sequenceNumber; | |
this.Payload = payload; | |
this.Created = created ?? DateTimeOffset.UtcNow; | |
} | |
} | |
class EventStore | |
{ | |
private readonly List<Event> _store; | |
private readonly Dictionary<string, List<int>> _index; | |
public EventStore() | |
{ | |
_store = new List<Event>(); | |
_index = new Dictionary<string, List<int>>(); | |
} | |
public void Append(IEnumerable<Event> uncommitted) | |
{ | |
var store = _store; // List<Event> | |
var index = _index; // Dictionary<string, List<int>> | |
foreach (var e in uncommitted) | |
{ | |
var id = store.Count + 1; // 1 based | |
if (index.TryGetValue(e.StreamId, out var stream)) | |
{ | |
if (!(e.SequenceNumber == stream.Count + 1)) | |
throw new InvalidOperationException($"data race detected ({e.StreamId}/{e.SequenceNumber})"); | |
stream.Add(id); | |
} | |
else | |
{ | |
if (!(e.SequenceNumber == 1)) | |
throw new InvalidOperationException($"data race detected ({e.StreamId}/{e.SequenceNumber})"); | |
index.Add(e.StreamId, new List<int> { id }); | |
} | |
store.Add(new Event(id, e.StreamId, e.SequenceNumber, e.Payload, e.Created)); | |
} | |
} | |
public IEnumerable<Event> GetEnumerableStream(string streamId, int minSequenceNumber = 1, int maxSequenceNumber = int.MaxValue) | |
{ | |
var store = _store; // List<Event> | |
var index = _index; // Dictionary<string, List<int>> | |
if (index.TryGetValue(streamId, out var stream)) | |
{ | |
var upperBound = Math.Min(maxSequenceNumber, stream.Count); | |
for (int i = minSequenceNumber - 1; i < upperBound; i++) | |
{ | |
yield return store[stream[i] - 1]; | |
} | |
} | |
} | |
public IEnumerable<Event> GetEnumerable(int minId = 1, int maxId = int.MaxValue) | |
{ | |
var store = _store; | |
var upperBound = Math.Min(maxId, store.Count); | |
for (int i = minId - 1; i < upperBound; i++) | |
{ | |
yield return store[i]; | |
} | |
} | |
} | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
CreateUserExample(); | |
} | |
private static void CreateUserExample() | |
{ | |
var es = new EventStore(); | |
CreateUser("[email protected]", es); | |
ChangeUserName("[email protected]", "[email protected]", es); | |
Console.WriteLine(JsonConvert.SerializeObject(es.GetEnumerable(), Formatting.Indented)); | |
ProjectUserTable(es); | |
// we can also compute a running total like this, | |
// there's no need to query the database for the | |
// information, we can create many online algorithms like this | |
// the efficently compute the next "value" as needed | |
var userCount = AggregateUserCount(0, es.GetEnumerable(1, 1)); | |
Console.WriteLine(userCount); | |
var userCount2 = AggregateUserCount(userCount, es.GetEnumerable(2, 2)); | |
Console.WriteLine(userCount2); | |
var userCount3 = AggregateUserCount(userCount2, es.GetEnumerable(3, 3)); | |
Console.WriteLine(userCount3); | |
} | |
private static void CreateUser(string userName, EventStore es) | |
{ | |
// for simplicity sake, use `userName` as `streamId` | |
var stream = es.GetEnumerableStream(userName); | |
if (stream.Any()) | |
{ | |
throw new InvalidOperationException("user exists"); | |
} | |
var e = new Event(0, userName, 1, new JObject { { "@type", "userCreated" }, { "userName", userName } }); | |
es.Append(new[] { e }); | |
} | |
private static void ChangeUserName(string currentUserName, string newUserName, EventStore es) | |
{ | |
var stream1 = es.GetEnumerableStream(currentUserName); | |
if (!stream1.Any()) | |
{ | |
throw new InvalidOperationException("user does not exist"); | |
} | |
// the check below is redundant because the event store | |
// will not allow an existing event to be overwritten | |
//var stream2 = es.GetStream(currentUserName); | |
//if (stream2.Any()) | |
//{ | |
// throw new InvalidOperationException("user exists"); | |
//} | |
var last = stream1.Last(); | |
var e1 = new Event(0, currentUserName, last.SequenceNumber + 1, new JObject { { "@type", "userNameChanged" }, { "newUserName", newUserName } }); | |
var e2 = new Event(0, newUserName, 1, new JObject { { "@type", "userCreated" }, { "userName", newUserName } }); | |
es.Append(new[] { e2, e1 }); | |
} | |
class UserEntity | |
{ | |
public string UserName { get; set; } | |
public List<string> UserNameHistory { get; set; } | |
} | |
private static void ProjectUserTable(EventStore es) | |
{ | |
var userTable = new Dictionary<string, UserEntity>(); | |
foreach (var e in es.GetEnumerable()) | |
{ | |
switch ((string)e.Payload["@type"]) | |
{ | |
case "userCreated": | |
{ | |
userTable[e.StreamId] = new UserEntity { UserName = (string)e.Payload["userName"] }; | |
break; | |
} | |
case "userNameChanged": | |
{ | |
// given the order in which these are created | |
// we can assume that the old user exists (no need to double check that) | |
// and given we create the new user before we issue the rename | |
// we don't have to check that the new user exists either | |
// we do however have to ensure that the list is not null | |
var currentUser = userTable[e.StreamId]; | |
userTable.Remove(e.StreamId); | |
var newUser = userTable[(string)e.Payload["newUserName"]]; | |
if (newUser.UserNameHistory == null) | |
{ | |
newUser.UserNameHistory = new List<string>(); | |
} | |
newUser.UserNameHistory.Add(e.StreamId); | |
break; | |
} | |
} | |
Console.WriteLine(JsonConvert.SerializeObject(userTable, Formatting.Indented)); | |
} | |
} | |
private static int AggregateUserCount(int count, IEnumerable<Event> source) | |
{ | |
foreach (var e in source) | |
{ | |
switch ((string)e.Payload["@type"]) | |
{ | |
case "userCreated": | |
{ | |
count++; | |
break; | |
} | |
case "userNameChanged": | |
{ | |
count--; | |
break; | |
} | |
} | |
} | |
return count; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment