Created
June 20, 2011 16:31
-
-
Save abdullin/1035945 to your computer and use it in GitHub Desktop.
Sample of message dispatcher that routes and records messages (for Lokad.CQRS v2.0)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// this dispatchers routes all incoming messages between 2 queues (commands/events) | |
// and also records all messages into a tape storage. | |
// fore registration sample see https://gist.github.com/1035950 | |
public sealed class RoutingDispatcher : ISingleThreadMessageDispatcher | |
{ | |
readonly IDictionary<string, IQueueWriter> _routes = new Dictionary<string, IQueueWriter>(); | |
readonly QueueWriterRegistry _factories; | |
readonly string _endpoint; | |
readonly ITapeWriter _writer; | |
readonly IEnvelopeStreamer _streamer; | |
public RoutingDispatcher(string endpoint, QueueWriterRegistry factories, IEnvelopeStreamer streamer, ITa | |
{ | |
_factories = factories; | |
_writer = writer; | |
_streamer = streamer; | |
_endpoint = endpoint; | |
} | |
public void DispatchMessage(ImmutableEnvelope envelope) | |
{ | |
Record(envelope); | |
RouteToProcessor(envelope); | |
} | |
void Record(ImmutableEnvelope message) | |
{ | |
var buffer = _streamer.SaveEnvelopeData(message); | |
_writer.Append("router-log", buffer); | |
} | |
public void Init() | |
{ | |
} | |
IQueueWriter GetRoute(string name) | |
{ | |
IQueueWriter value; | |
if (_routes.TryGetValue(name, out value)) | |
{ | |
return value; | |
} | |
IQueueWriterFactory factory; | |
if (_factories.TryGet(_endpoint, out factory)) | |
{ | |
value = factory.GetWriteQueue(name); | |
_routes.Add(name, value); | |
return value; | |
} | |
// bingo | |
var message = string.Format("Failed to load factory for route '{0}:{1}'", _endpoint, name); | |
throw new InvalidOperationException(message); | |
} | |
void RouteToProcessor(ImmutableEnvelope message) | |
{ | |
if (message.Items.All(i => i.Content is IDomainEvent)) | |
{ | |
GetRoute(IdFor.Events).PutMessage(message); | |
return; | |
} | |
if (message.Items.All(i => i.Content is IDomainCommand)) | |
{ | |
GetRoute(IdFor.Commands).PutMessage(message); | |
return; | |
} | |
throw new InvalidOperationException("Unexpected envelope contents"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment