Skip to content

Instantly share code, notes, and snippets.

@serialseb
Last active August 29, 2015 14:03
Show Gist options
  • Save serialseb/d9a5926849ee9a0c6a6d to your computer and use it in GitHub Desktop.
Save serialseb/d9a5926849ee9a0c6a6d to your computer and use it in GitHub Desktop.
Projecting things
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Bson.Serialization.IdGenerators;
using MongoDB.Driver;
using MongoDB.Driver.Builders;
using NEventStore;
using NEventStore.Dispatcher;
namespace TestNewProjections
{
class Program
{
const int CONSUMER_COUNT = 10;
const int AGGREGATE_COUNT = 100;
const int EVENT_COUNT = 1000;
const string DATABASE_NAME = "projections";
static void Main(string[] args)
{
TestContention(new InMemoryProjectionStore(), delay: false, projectors: CONSUMER_COUNT);
//var mongo = PrepareMongo();
//TestContention(new MongoProjectionStore(mongo), delay: false, projectors: CONSUMER_COUNT);
//mongo.Database.Drop();
}
static MongoCollection<ProjectionContainer> PrepareMongo()
{
var client = new MongoClient("mongodb://quantum");
var server = client.GetServer();
if (server.GetDatabaseNames().Contains(DATABASE_NAME))
server.DropDatabase(DATABASE_NAME);
var db = server.GetDatabase(DATABASE_NAME);
db.CreateCollection("projection");
var mongoCollection = db.GetCollection<ProjectionContainer>("projection");
mongoCollection.CreateIndex(IndexKeys<ProjectionContainer>.Descending(_ => _.Metadata.Version));
return mongoCollection;
}
static void TestContention(IProjectionStore projections, bool delay, int projectors)
{
var dispatcher = new InMemoryDispatcher(delay, true);
var es = Wireup.Init()
.UsingInMemoryPersistence()
.UsingAsynchronousDispatchScheduler()
.DispatchTo(dispatcher)
.Build();
var testStreamId = Guid.NewGuid().ToString();
var consumers = Enumerable.Range(0, projectors).Select(_ => new MessageConsumer(es, projections, _)).ToArray();
Console.WriteLine("Testing {0} - Starting {1} projectors", projections.GetType().Name, projectors);
foreach (var consumer in consumers) consumer.Start();
var random = new Random();
Console.WriteLine("Pushing {0} events on {1} aggregates", EVENT_COUNT, AGGREGATE_COUNT);
var sw = new Stopwatch();
var postWrites = new Stopwatch();
sw.Start();
int commitCount = 0;
for (var i = 0; i < AGGREGATE_COUNT; i++)
{
var s = es.CreateStream(Guid.NewGuid());
for (var j = 0; j < EVENT_COUNT; j++)
{
s.Add(new EventMessage { Body = j });
if (j % (random.Next(4) + 1) == 0)
{
commitCount++;
s.CommitChanges(Guid.NewGuid());
}
}
if (s.UncommittedEvents.Any())
{
commitCount++;
s.CommitChanges(Guid.NewGuid());
}
}
postWrites.Start();
//es.Dispose();
Console.WriteLine("Wrote {0} commits, Waiting for results.", commitCount);
do
{
Thread.Sleep(TimeSpan.FromSeconds(1));
}
while (dispatcher.DispatchedCommit < commitCount);
sw.Stop();
postWrites.Start();
var results = ReadResults();
Console.WriteLine(results);
Console.WriteLine("Ran projections {0} times for {1} commits in {2}, processing for {3} after finishing writing events.",
results.Length,
commitCount, sw.Elapsed, postWrites.Elapsed);
var all = projections.ToList();
if (all.Count != AGGREGATE_COUNT)
Console.WriteLine("Not enough projections.");
foreach (var projection in projections)
{
if (projection.Metadata.Version != EVENT_COUNT && projection.Projection != EVENT_COUNT)
Console.Write('N');
if (projection.Metadata.Version != EVENT_COUNT)
Console.Write('V');
else if (projection.Projection != EVENT_COUNT)
Console.Write('P');
else
Console.Write('.');
}
}
static string ReadResults()
{
var sb = new StringBuilder();
while(MessageDispatcher.Results.IsEmpty == false)
{
char outValue;
if (MessageDispatcher.Results.TryDequeue(out outValue))
sb.Append(outValue);
}
return sb.ToString();
}
}
class InMemoryDispatcher : IDispatchCommits
{
readonly bool _randomDelays;
readonly bool _runAsync;
readonly Random random = new Random();
Action<ICommit> _run;
int _dispatchedCommit;
public InMemoryDispatcher(bool randomDelays, bool runAsync)
{
_randomDelays = randomDelays;
_run = ProcessCommit;
if (runAsync) _run = ProcessCommitAsync;
}
public void Dispatch(ICommit commit)
{
_run(commit);
}
void ProcessCommitAsync(ICommit commit)
{
//Task.Run(() => ProcessCommit(commit));
Task.Factory.StartNew(() => ProcessCommit(commit), TaskCreationOptions.LongRunning);
}
void ProcessCommit(ICommit commit)
{
if (_randomDelays)
Thread.Sleep(random.Next(200));
MessageDispatcher.Queue.Add(new EventStreamPersisted(
commit.StreamId,
commit.StreamRevision));
Interlocked.Increment(ref _dispatchedCommit);
}
public int DispatchedCommit
{
get { return _dispatchedCommit; }
}
public void Dispose()
{
}
}
public class MessageDispatcher
{
public static readonly BlockingCollection<EventStreamPersisted> Queue =
new BlockingCollection<EventStreamPersisted>();
public static readonly ConcurrentQueue<char> Results = new ConcurrentQueue<char>();
}
public class MessageConsumer
{
readonly CancellationTokenSource _cancelSource;
readonly Task _consume;
readonly IStoreEvents _es;
readonly int _index;
readonly IProjectionStore _projectionStore;
public bool Consuming { get; set; }
public MessageConsumer(IStoreEvents es, IProjectionStore projectionStore, int index)
{
_es = es;
_projectionStore = projectionStore;
_index = index;
_cancelSource = new CancellationTokenSource();
_consume = new Task(Read, _cancelSource.Token);
}
public void Start()
{
_consume.Start();
}
void Read()
{
do
{
var message = MessageDispatcher.Queue.Take();
Consuming = true;
new ProjectionRunner(_es, _projectionStore, _index).Project(message.StreamId, message.Version);
Consuming = false;
}
while (true);
}
void Stop()
{
_cancelSource.Cancel();
}
}
class ProjectionRunner
{
readonly IStoreEvents _eventStore;
readonly int _index;
readonly IProjectionStore _projectionStore;
public ProjectionRunner(IStoreEvents eventStore, IProjectionStore projectionStore, int index)
{
_eventStore = eventStore;
_projectionStore = projectionStore;
_index = index;
}
public void Project(string streamId, int version)
{
for (var i = 0; i < 500; i++)
{
if (TryProject(streamId, version))
{
MessageDispatcher.Results.Enqueue('.');
return;
}
MessageDispatcher.Results.Enqueue('x');
}
throw new InvalidOperationException("Tried 500 times and gave up");
}
bool TryProject(string streamId, int version)
{
var projection = _projectionStore.LoadLatest(streamId);
var existingVersion = -1;
if (projection == null)
projection = new ProjectionContainer { StreamId = streamId };
else
existingVersion = projection.Metadata.Version;
if (existingVersion >= version)
{
MessageDispatcher.Results.Enqueue('k');
return true;
}
var commits = _eventStore.Advanced.GetFrom(streamId, existingVersion + 1, int.MaxValue).ToList();
var eventMessages = commits.SelectMany(_ => _.Events).ToList();
if (eventMessages.Any() == false)
{
MessageDispatcher.Results.Enqueue('0');
return true;
}
var newVersion = commits.Max(_ => _.StreamRevision);
foreach (var ev in eventMessages)
projection.Projection++;
// apply events to projection
projection.Metadata.Version = newVersion;
return _projectionStore.TryUpdate(projection, existingVersion);
}
}
public class EventStreamPersisted
{
public EventStreamPersisted(string streamId, int version)
{
StreamId = streamId;
Version = version;
}
public string StreamId { get; private set; }
public int Version { get; private set; }
}
public interface IProjectionStore : IEnumerable<ProjectionContainer>
{
ProjectionContainer LoadLatest(string streamId);
bool TryUpdate(ProjectionContainer data, int existingVersion = -1);
}
public class MongoProjectionStore : IProjectionStore
{
readonly MongoCollection<ProjectionContainer> _db;
const int ERROR_DUPLICATE_KEY = 1100;
public MongoProjectionStore(MongoCollection<ProjectionContainer> db)
{
_db = db;
}
public ProjectionContainer LoadLatest(string streamId)
{
return _db.FindOneById(streamId);
}
public bool TryUpdate(ProjectionContainer data, int existingVersion = -1)
{
var documentToUpdate = Query.And(Query<ProjectionContainer>.EQ(_ => _.StreamId, data.StreamId),
Query<ProjectionContainer>.EQ(_ => _.Metadata.Version, existingVersion));
try
{
var result = _db.FindAndModify(new FindAndModifyArgs
{
VersionReturned = FindAndModifyDocumentVersion.Modified,
Query = documentToUpdate,
Update = Update<ProjectionContainer>.Set(_ => _.Metadata, data.Metadata)
.Set(_ => _.Projection, data.Projection),
Upsert = true
});
if (result.Ok) return true;
}
catch (MongoCommandException e)
{
if (e.CommandResult.Code != ERROR_DUPLICATE_KEY)
throw;
}
return false;
}
public IEnumerator<ProjectionContainer> GetEnumerator()
{
return _db.FindAll().GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
public class InMemoryProjectionStore : IEnumerable<ProjectionContainer>, IProjectionStore
{
readonly ConcurrentDictionary<Tuple<string, int>, ProjectionContainer> _projections =
new ConcurrentDictionary<Tuple<string, int>, ProjectionContainer>();
public ProjectionContainer LoadLatest(string streamId)
{
var value = _projections.Values.Where(_ => _.StreamId == streamId)
.OrderByDescending(_ => _.Metadata.Version)
.FirstOrDefault();
return value == null
? null
: Copy(value);
}
public bool TryUpdate(ProjectionContainer data, int existingVersion = -1)
{
var streamId = data.StreamId;
var newKey = Tuple.Create(streamId, data.Metadata.Version);
if (existingVersion != -1)
{
var existingKey = Tuple.Create(streamId, existingVersion);
if (_projections.ContainsKey(existingKey) == false)
return false;
ProjectionContainer oldData;
if (!_projections.TryRemove(existingKey, out oldData))
return false;
}
return _projections.TryAdd(newKey, Copy(data));
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public IEnumerator<ProjectionContainer> GetEnumerator()
{
return _projections.Values.GetEnumerator();
}
static ProjectionContainer Copy(ProjectionContainer value)
{
return new ProjectionContainer
{
StreamId = value.StreamId,
Projection = value.Projection,
Metadata =
new ProjectionMetadata
{
Version = value.Metadata.Version,
}
};
}
}
public class ProjectionContainer
{
public ProjectionContainer()
{
Metadata = new ProjectionMetadata();
}
[BsonId]
public string StreamId { get; set; }
public ProjectionMetadata Metadata { get; set; }
public int Projection { get; set; }
}
public class ProjectionMetadata
{
public int Version { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment