Last active
August 29, 2015 14:08
-
-
Save andreabalducci/2b2ad587f47afdbec185 to your computer and use it in GitHub Desktop.
Dealing with eventual consistency with read model "promises" with NEvenstore & MongoDb (WIP)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Jarvis.DocumentStore.Core.CommandHandlers.HandleHandlers; | |
using Jarvis.DocumentStore.Core.Domain.Document.Commands; | |
using Jarvis.DocumentStore.Core.Domain.Handle; | |
using Jarvis.DocumentStore.Core.ReadModel; | |
namespace Jarvis.DocumentStore.Core.CommandHandlers.DocumentHandlers | |
{ | |
public class CreateDocumentCommandHandler : DocumentCommandHandler<CreateDocument> | |
{ | |
readonly IHandleMapper _mapper; | |
public CreateDocumentCommandHandler(IHandleMapper mapper, IHandleWriter writer) | |
{ | |
_mapper = mapper; | |
} | |
protected override void Execute(CreateDocument cmd) | |
{ | |
FindAndModify( | |
cmd.AggregateId, | |
doc => doc.Create(cmd.AggregateId,cmd.BlobId,cmd.HandleInfo,cmd.Hash), | |
true | |
); | |
LinkHandle(cmd); | |
} | |
void LinkHandle(CreateDocument cmd) | |
{ | |
var docHandle = cmd.HandleInfo.Handle; | |
var id = _mapper.Map(docHandle); | |
var handle = Repository.GetById<Handle>(id); | |
if (!handle.HasBeenCreated) | |
{ | |
handle.Initialize(id, docHandle); | |
} | |
handle.Link(cmd.AggregateId); | |
handle.SetCustomData(cmd.HandleInfo.CustomData); | |
Repository.Save(handle, cmd.MessageId, h => { }); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.ComponentModel; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
using CQRS.Kernel.Events; | |
using Jarvis.DocumentStore.Core.Domain.Handle.Events; | |
using Jarvis.DocumentStore.Core.ReadModel; | |
using NEventStore; | |
namespace Jarvis.DocumentStore.Core.EventHandlers | |
{ | |
public class HandleProjection : AbstractProjection | |
,IEventHandler<HandleInitialized> | |
,IEventHandler<HandleLinked> | |
,IEventHandler<HandleCustomDataSet> | |
,IEventHandler<HandleDeleted> | |
{ | |
readonly IHandleWriter _writer; | |
public HandleProjection(IHandleWriter writer) | |
{ | |
_writer = writer; | |
} | |
public override void Drop() | |
{ | |
_writer.Drop(); | |
} | |
public override void SetUp() | |
{ | |
_writer.Init(); | |
} | |
public void On(HandleLinked e) | |
{ | |
_writer.ConfirmLink( | |
e.Handle, | |
e.DocumentId, | |
LongCheckpoint.Parse(e.CheckpointToken).LongValue | |
); | |
} | |
public void On(HandleCustomDataSet e) | |
{ | |
_writer.UpdateCustomData(e.Handle, e.CustomData); | |
} | |
public void On(HandleInitialized e) | |
{ | |
} | |
public void On(HandleDeleted e) | |
{ | |
_writer.Delete(e.Handle, LongCheckpoint.Parse(e.CheckpointToken).LongValue); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Linq; | |
using CQRS.Shared.ReadModel; | |
using Jarvis.DocumentStore.Core.Domain.Document; | |
using Jarvis.DocumentStore.Core.Domain.Handle; | |
using Jarvis.DocumentStore.Core.Model; | |
using MongoDB.Bson; | |
using MongoDB.Bson.Serialization.Attributes; | |
using MongoDB.Driver; | |
using MongoDB.Driver.Builders; | |
using MongoDB.Driver.Linq; | |
namespace Jarvis.DocumentStore.Core.ReadModel | |
{ | |
public class HandleReadModel : IReadModel | |
{ | |
[BsonId] | |
public DocumentHandle Handle { get; private set; } | |
public DocumentId DocumentId { get; private set; } | |
public long CreatetAt { get; private set; } | |
public long ProjectedAt { get; private set; } | |
public HandleCustomData CustomData { get; private set; } | |
public FileNameWithExtension FileName { get; private set; } | |
public HandleReadModel(DocumentHandle handle) | |
{ | |
Handle = handle; | |
} | |
public HandleReadModel(DocumentHandle handle, DocumentId documentid, FileNameWithExtension fileName) | |
{ | |
Handle = handle; | |
DocumentId = documentid; | |
FileName = fileName; | |
} | |
public bool IsPending() | |
{ | |
return this.CreatetAt > this.ProjectedAt; | |
} | |
} | |
public interface IHandleWriter | |
{ | |
void Promise(DocumentHandle handle, FileNameWithExtension fileName, DocumentId id, long createdAt); | |
HandleReadModel FindOneById(DocumentHandle handle); | |
void Drop(); | |
void Init(); | |
void ConfirmLink(DocumentHandle handle, DocumentId id, long projectedAt); | |
void UpdateCustomData(DocumentHandle handle, HandleCustomData customData); | |
void Delete(DocumentHandle handle, long projectedAt); | |
IQueryable<HandleReadModel> AllSortedByHandle { get;} | |
} | |
public class HandleWriter : IHandleWriter | |
{ | |
readonly MongoCollection<HandleReadModel> _collection; | |
public HandleWriter(MongoDatabase readModelDb) | |
{ | |
_collection = readModelDb.GetCollection<HandleReadModel>(CollectionNames.GetCollectionName<HandleReadModel>()); | |
} | |
public void Promise(DocumentHandle handle, FileNameWithExtension fileName, DocumentId id, long createdAt) | |
{ | |
var args = new FindAndModifyArgs | |
{ | |
Query = Query<HandleReadModel> | |
.EQ(x => x.Handle, handle), | |
Update = Update<HandleReadModel> | |
.Set(x=>x.DocumentId, id) | |
.Set(x=>x.CreatetAt, createdAt) | |
.Set(x=>x.FileName, fileName), | |
Upsert = true | |
}; | |
_collection.FindAndModify(args); | |
} | |
public void ConfirmLink(DocumentHandle handle,DocumentId id, long projectedAt) | |
{ | |
var args = new FindAndModifyArgs | |
{ | |
Query = Query.And( | |
Query<HandleReadModel>.EQ(x => x.Handle, handle), | |
Query<HandleReadModel>.LTE(x => x.CreatetAt, projectedAt) | |
), | |
Update = Update<HandleReadModel> | |
.Set(x => x.DocumentId, id) | |
.Set(x => x.ProjectedAt, projectedAt) | |
}; | |
_collection.FindAndModify(args); | |
} | |
public void UpdateCustomData(DocumentHandle handle, HandleCustomData customData) | |
{ | |
var args = new FindAndModifyArgs | |
{ | |
Query = Query.And( | |
Query<HandleReadModel>.EQ(x => x.Handle, handle) | |
), | |
Update = Update<HandleReadModel> | |
.Set(x => x.CustomData, customData) | |
}; | |
_collection.FindAndModify(args); | |
} | |
public void Delete(DocumentHandle handle, long projectedAt) | |
{ | |
/* TODO: handle Delete promise */ | |
var args = new FindAndRemoveArgs() | |
{ | |
Query = Query.And( | |
Query<HandleReadModel>.EQ(x => x.Handle, handle), | |
Query<HandleReadModel>.LTE(x => x.CreatetAt, projectedAt) | |
) | |
}; | |
_collection.FindAndRemove(args); | |
} | |
public IQueryable<HandleReadModel> AllSortedByHandle { | |
get { return _collection.AsQueryable().OrderBy(x => x.Handle); } | |
} | |
public HandleReadModel FindOneById(DocumentHandle handle) | |
{ | |
return _collection.FindOneById(BsonValue.Create(handle)); | |
} | |
public void Drop() | |
{ | |
_collection.Drop(); | |
} | |
public void Init() | |
{ | |
_collection.CreateIndex(IndexKeys<HandleReadModel>.Ascending(x => x.Handle, x => x.CreatetAt)); | |
} | |
public void Create(DocumentHandle documentHandle) | |
{ | |
_collection.Insert(new HandleReadModel(documentHandle)); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
using Jarvis.DocumentStore.Core.Domain.Document; | |
using Jarvis.DocumentStore.Core.Domain.Document.Events; | |
using Jarvis.DocumentStore.Core.ReadModel; | |
using NEventStore; | |
namespace Jarvis.DocumentStore.Core.EvenstoreHooks | |
{ | |
public class ReadModelPromisesHook : PipelineHookBase | |
{ | |
private readonly IHandleWriter _handleWriter; | |
private static readonly string DocumentTypeName = typeof (Document).FullName; | |
public ReadModelPromisesHook(IHandleWriter handleWriter) | |
{ | |
_handleWriter = handleWriter; | |
} | |
public override void PostCommit(ICommit committed) | |
{ | |
if (!committed.Headers.ContainsKey("AggregateType")) | |
return; | |
var type = (string)committed.Headers["AggregateType"]; | |
if (type != DocumentTypeName) | |
return; | |
var docCreated = committed.Events | |
.Where(x => x.Body is DocumentCreated) | |
.Select(x => (DocumentCreated)x.Body) | |
.FirstOrDefault(); | |
if (docCreated != null) | |
{ | |
_handleWriter.Promise( | |
docCreated.HandleInfo.Handle, | |
docCreated.HandleInfo.FileName, | |
(DocumentId)docCreated.AggregateId, | |
LongCheckpoint.Parse(committed.CheckpointToken).LongValue | |
); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment