Skip to content

Instantly share code, notes, and snippets.

@ToJans
Created October 22, 2010 10:39
Show Gist options
  • Save ToJans/640316 to your computer and use it in GitHub Desktop.
Save ToJans/640316 to your computer and use it in GitHub Desktop.
// POC CQRS bus for stateless synchronous commandhandlers
// and both both syncronous and asynchronous stateless eventhandlers
// Remarks : ToJans@twitter
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using Hotco.Interfaces;
namespace Hotco.Core.CQRS.Services
{
public class CQRSMessageBus : IMessageBus
{
IHandlerFactory handlerfactory;
ILog Log;
public CQRSMessageBus(IHandlerFactory handlerfactory,ILog Log)
{
this.handlerfactory = handlerfactory;
this.Log = Log;
}
public void HandleCommand(ICommand cmd)
{
RunJob(IsBackground(cmd),delegate { HandleCommandInternal(cmd);});
}
public void HandleEvent(IEvent e)
{
RunJob(IsBackground(e),delegate {HandleEventInternal(e);});
}
bool IsBackground(object msg)
{
return (msg is ISupportsBackground) && (msg as ISupportsBackground).RunInBackground;
}
void RunJob(bool Inbackground,Action act)
{
if (Inbackground)
{
ThreadPool.QueueUserWorkItem(
delegate {
try
{
act();
}
catch(Exception e)
{
Log.WriteLine(()=>"Error in background task:"+e.ToString());
}
});
}
else
{
act();
}
}
void HandleCommandInternal(ICommand command)
{
var msgs = new List<IMessage>();
msgs.Add(command);
do
{
var msg = msgs[0];
msgs.RemoveAt(0);
System.Collections.IEnumerable evs;
try
{
evs = handlerfactory.Handle(msg).OfType<IMessage>();
}
catch(Exception)
{
Log.WriteLine(() => "Error handling "+msg.GetType().FullName).DumpObject(msg);
throw;
}
foreach(var ev in evs.OfType<IEvent>())
{
msgs.Add(ev);
}
} while (msgs.Count>0);
}
void HandleEventInternal(IEvent e)
{
handlerfactory.Handle(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment