Last active
August 29, 2015 14:03
-
-
Save serialseb/d9a5926849ee9a0c6a6d to your computer and use it in GitHub Desktop.
Projecting things
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using MongoDB.Bson.Serialization.Attributes; | |
using MongoDB.Bson.Serialization.IdGenerators; | |
using MongoDB.Driver; | |
using MongoDB.Driver.Builders; | |
using NEventStore; | |
using NEventStore.Dispatcher; | |
namespace TestNewProjections | |
{ | |
class Program | |
{ | |
const int CONSUMER_COUNT = 10; | |
const int AGGREGATE_COUNT = 100; | |
const int EVENT_COUNT = 1000; | |
const string DATABASE_NAME = "projections"; | |
static void Main(string[] args) | |
{ | |
TestContention(new InMemoryProjectionStore(), delay: false, projectors: CONSUMER_COUNT); | |
//var mongo = PrepareMongo(); | |
//TestContention(new MongoProjectionStore(mongo), delay: false, projectors: CONSUMER_COUNT); | |
//mongo.Database.Drop(); | |
} | |
static MongoCollection<ProjectionContainer> PrepareMongo() | |
{ | |
var client = new MongoClient("mongodb://quantum"); | |
var server = client.GetServer(); | |
if (server.GetDatabaseNames().Contains(DATABASE_NAME)) | |
server.DropDatabase(DATABASE_NAME); | |
var db = server.GetDatabase(DATABASE_NAME); | |
db.CreateCollection("projection"); | |
var mongoCollection = db.GetCollection<ProjectionContainer>("projection"); | |
mongoCollection.CreateIndex(IndexKeys<ProjectionContainer>.Descending(_ => _.Metadata.Version)); | |
return mongoCollection; | |
} | |
static void TestContention(IProjectionStore projections, bool delay, int projectors) | |
{ | |
var dispatcher = new InMemoryDispatcher(delay, true); | |
var es = Wireup.Init() | |
.UsingInMemoryPersistence() | |
.UsingAsynchronousDispatchScheduler() | |
.DispatchTo(dispatcher) | |
.Build(); | |
var testStreamId = Guid.NewGuid().ToString(); | |
var consumers = Enumerable.Range(0, projectors).Select(_ => new MessageConsumer(es, projections, _)).ToArray(); | |
Console.WriteLine("Testing {0} - Starting {1} projectors", projections.GetType().Name, projectors); | |
foreach (var consumer in consumers) consumer.Start(); | |
var random = new Random(); | |
Console.WriteLine("Pushing {0} events on {1} aggregates", EVENT_COUNT, AGGREGATE_COUNT); | |
var sw = new Stopwatch(); | |
var postWrites = new Stopwatch(); | |
sw.Start(); | |
int commitCount = 0; | |
for (var i = 0; i < AGGREGATE_COUNT; i++) | |
{ | |
var s = es.CreateStream(Guid.NewGuid()); | |
for (var j = 0; j < EVENT_COUNT; j++) | |
{ | |
s.Add(new EventMessage { Body = j }); | |
if (j % (random.Next(4) + 1) == 0) | |
{ | |
commitCount++; | |
s.CommitChanges(Guid.NewGuid()); | |
} | |
} | |
if (s.UncommittedEvents.Any()) | |
{ | |
commitCount++; | |
s.CommitChanges(Guid.NewGuid()); | |
} | |
} | |
postWrites.Start(); | |
//es.Dispose(); | |
Console.WriteLine("Wrote {0} commits, Waiting for results.", commitCount); | |
do | |
{ | |
Thread.Sleep(TimeSpan.FromSeconds(1)); | |
} | |
while (dispatcher.DispatchedCommit < commitCount); | |
sw.Stop(); | |
postWrites.Start(); | |
var results = ReadResults(); | |
Console.WriteLine(results); | |
Console.WriteLine("Ran projections {0} times for {1} commits in {2}, processing for {3} after finishing writing events.", | |
results.Length, | |
commitCount, sw.Elapsed, postWrites.Elapsed); | |
var all = projections.ToList(); | |
if (all.Count != AGGREGATE_COUNT) | |
Console.WriteLine("Not enough projections."); | |
foreach (var projection in projections) | |
{ | |
if (projection.Metadata.Version != EVENT_COUNT && projection.Projection != EVENT_COUNT) | |
Console.Write('N'); | |
if (projection.Metadata.Version != EVENT_COUNT) | |
Console.Write('V'); | |
else if (projection.Projection != EVENT_COUNT) | |
Console.Write('P'); | |
else | |
Console.Write('.'); | |
} | |
} | |
static string ReadResults() | |
{ | |
var sb = new StringBuilder(); | |
while(MessageDispatcher.Results.IsEmpty == false) | |
{ | |
char outValue; | |
if (MessageDispatcher.Results.TryDequeue(out outValue)) | |
sb.Append(outValue); | |
} | |
return sb.ToString(); | |
} | |
} | |
class InMemoryDispatcher : IDispatchCommits | |
{ | |
readonly bool _randomDelays; | |
readonly bool _runAsync; | |
readonly Random random = new Random(); | |
Action<ICommit> _run; | |
int _dispatchedCommit; | |
public InMemoryDispatcher(bool randomDelays, bool runAsync) | |
{ | |
_randomDelays = randomDelays; | |
_run = ProcessCommit; | |
if (runAsync) _run = ProcessCommitAsync; | |
} | |
public void Dispatch(ICommit commit) | |
{ | |
_run(commit); | |
} | |
void ProcessCommitAsync(ICommit commit) | |
{ | |
//Task.Run(() => ProcessCommit(commit)); | |
Task.Factory.StartNew(() => ProcessCommit(commit), TaskCreationOptions.LongRunning); | |
} | |
void ProcessCommit(ICommit commit) | |
{ | |
if (_randomDelays) | |
Thread.Sleep(random.Next(200)); | |
MessageDispatcher.Queue.Add(new EventStreamPersisted( | |
commit.StreamId, | |
commit.StreamRevision)); | |
Interlocked.Increment(ref _dispatchedCommit); | |
} | |
public int DispatchedCommit | |
{ | |
get { return _dispatchedCommit; } | |
} | |
public void Dispose() | |
{ | |
} | |
} | |
public class MessageDispatcher | |
{ | |
public static readonly BlockingCollection<EventStreamPersisted> Queue = | |
new BlockingCollection<EventStreamPersisted>(); | |
public static readonly ConcurrentQueue<char> Results = new ConcurrentQueue<char>(); | |
} | |
public class MessageConsumer | |
{ | |
readonly CancellationTokenSource _cancelSource; | |
readonly Task _consume; | |
readonly IStoreEvents _es; | |
readonly int _index; | |
readonly IProjectionStore _projectionStore; | |
public bool Consuming { get; set; } | |
public MessageConsumer(IStoreEvents es, IProjectionStore projectionStore, int index) | |
{ | |
_es = es; | |
_projectionStore = projectionStore; | |
_index = index; | |
_cancelSource = new CancellationTokenSource(); | |
_consume = new Task(Read, _cancelSource.Token); | |
} | |
public void Start() | |
{ | |
_consume.Start(); | |
} | |
void Read() | |
{ | |
do | |
{ | |
var message = MessageDispatcher.Queue.Take(); | |
Consuming = true; | |
new ProjectionRunner(_es, _projectionStore, _index).Project(message.StreamId, message.Version); | |
Consuming = false; | |
} | |
while (true); | |
} | |
void Stop() | |
{ | |
_cancelSource.Cancel(); | |
} | |
} | |
class ProjectionRunner | |
{ | |
readonly IStoreEvents _eventStore; | |
readonly int _index; | |
readonly IProjectionStore _projectionStore; | |
public ProjectionRunner(IStoreEvents eventStore, IProjectionStore projectionStore, int index) | |
{ | |
_eventStore = eventStore; | |
_projectionStore = projectionStore; | |
_index = index; | |
} | |
public void Project(string streamId, int version) | |
{ | |
for (var i = 0; i < 500; i++) | |
{ | |
if (TryProject(streamId, version)) | |
{ | |
MessageDispatcher.Results.Enqueue('.'); | |
return; | |
} | |
MessageDispatcher.Results.Enqueue('x'); | |
} | |
throw new InvalidOperationException("Tried 500 times and gave up"); | |
} | |
bool TryProject(string streamId, int version) | |
{ | |
var projection = _projectionStore.LoadLatest(streamId); | |
var existingVersion = -1; | |
if (projection == null) | |
projection = new ProjectionContainer { StreamId = streamId }; | |
else | |
existingVersion = projection.Metadata.Version; | |
if (existingVersion >= version) | |
{ | |
MessageDispatcher.Results.Enqueue('k'); | |
return true; | |
} | |
var commits = _eventStore.Advanced.GetFrom(streamId, existingVersion + 1, int.MaxValue).ToList(); | |
var eventMessages = commits.SelectMany(_ => _.Events).ToList(); | |
if (eventMessages.Any() == false) | |
{ | |
MessageDispatcher.Results.Enqueue('0'); | |
return true; | |
} | |
var newVersion = commits.Max(_ => _.StreamRevision); | |
foreach (var ev in eventMessages) | |
projection.Projection++; | |
// apply events to projection | |
projection.Metadata.Version = newVersion; | |
return _projectionStore.TryUpdate(projection, existingVersion); | |
} | |
} | |
public class EventStreamPersisted | |
{ | |
public EventStreamPersisted(string streamId, int version) | |
{ | |
StreamId = streamId; | |
Version = version; | |
} | |
public string StreamId { get; private set; } | |
public int Version { get; private set; } | |
} | |
public interface IProjectionStore : IEnumerable<ProjectionContainer> | |
{ | |
ProjectionContainer LoadLatest(string streamId); | |
bool TryUpdate(ProjectionContainer data, int existingVersion = -1); | |
} | |
public class MongoProjectionStore : IProjectionStore | |
{ | |
readonly MongoCollection<ProjectionContainer> _db; | |
const int ERROR_DUPLICATE_KEY = 1100; | |
public MongoProjectionStore(MongoCollection<ProjectionContainer> db) | |
{ | |
_db = db; | |
} | |
public ProjectionContainer LoadLatest(string streamId) | |
{ | |
return _db.FindOneById(streamId); | |
} | |
public bool TryUpdate(ProjectionContainer data, int existingVersion = -1) | |
{ | |
var documentToUpdate = Query.And(Query<ProjectionContainer>.EQ(_ => _.StreamId, data.StreamId), | |
Query<ProjectionContainer>.EQ(_ => _.Metadata.Version, existingVersion)); | |
try | |
{ | |
var result = _db.FindAndModify(new FindAndModifyArgs | |
{ | |
VersionReturned = FindAndModifyDocumentVersion.Modified, | |
Query = documentToUpdate, | |
Update = Update<ProjectionContainer>.Set(_ => _.Metadata, data.Metadata) | |
.Set(_ => _.Projection, data.Projection), | |
Upsert = true | |
}); | |
if (result.Ok) return true; | |
} | |
catch (MongoCommandException e) | |
{ | |
if (e.CommandResult.Code != ERROR_DUPLICATE_KEY) | |
throw; | |
} | |
return false; | |
} | |
public IEnumerator<ProjectionContainer> GetEnumerator() | |
{ | |
return _db.FindAll().GetEnumerator(); | |
} | |
IEnumerator IEnumerable.GetEnumerator() | |
{ | |
return GetEnumerator(); | |
} | |
} | |
public class InMemoryProjectionStore : IEnumerable<ProjectionContainer>, IProjectionStore | |
{ | |
readonly ConcurrentDictionary<Tuple<string, int>, ProjectionContainer> _projections = | |
new ConcurrentDictionary<Tuple<string, int>, ProjectionContainer>(); | |
public ProjectionContainer LoadLatest(string streamId) | |
{ | |
var value = _projections.Values.Where(_ => _.StreamId == streamId) | |
.OrderByDescending(_ => _.Metadata.Version) | |
.FirstOrDefault(); | |
return value == null | |
? null | |
: Copy(value); | |
} | |
public bool TryUpdate(ProjectionContainer data, int existingVersion = -1) | |
{ | |
var streamId = data.StreamId; | |
var newKey = Tuple.Create(streamId, data.Metadata.Version); | |
if (existingVersion != -1) | |
{ | |
var existingKey = Tuple.Create(streamId, existingVersion); | |
if (_projections.ContainsKey(existingKey) == false) | |
return false; | |
ProjectionContainer oldData; | |
if (!_projections.TryRemove(existingKey, out oldData)) | |
return false; | |
} | |
return _projections.TryAdd(newKey, Copy(data)); | |
} | |
IEnumerator IEnumerable.GetEnumerator() | |
{ | |
return GetEnumerator(); | |
} | |
public IEnumerator<ProjectionContainer> GetEnumerator() | |
{ | |
return _projections.Values.GetEnumerator(); | |
} | |
static ProjectionContainer Copy(ProjectionContainer value) | |
{ | |
return new ProjectionContainer | |
{ | |
StreamId = value.StreamId, | |
Projection = value.Projection, | |
Metadata = | |
new ProjectionMetadata | |
{ | |
Version = value.Metadata.Version, | |
} | |
}; | |
} | |
} | |
public class ProjectionContainer | |
{ | |
public ProjectionContainer() | |
{ | |
Metadata = new ProjectionMetadata(); | |
} | |
[BsonId] | |
public string StreamId { get; set; } | |
public ProjectionMetadata Metadata { get; set; } | |
public int Projection { get; set; } | |
} | |
public class ProjectionMetadata | |
{ | |
public int Version { get; set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment