Skip to content

Instantly share code, notes, and snippets.

@JamesTryand
Created November 20, 2012 18:48
Show Gist options
  • Select an option

  • Save JamesTryand/4120073 to your computer and use it in GitHub Desktop.

Select an option

Save JamesTryand/4120073 to your computer and use it in GitHub Desktop.
Observable Bus
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Infrastructure.Domain;
using System.Data.SqlClient;
using System.Reactive.Subjects;
using System.Reactive.Linq;
namespace Infrastructure.ScratchDomain
{
public class ObservableBus<X> : ICommandSender, IEventPublisher, IServiceBus, IObservable<Message>, IObserver<Message> where X : ISubject<Message>, new()
{
private readonly Dictionary<Type, List<Action<Message>>> routes = new Dictionary<Type, List<Action<Message>>>();
private readonly Dictionary<Type, X> streams = new Dictionary<Type, X>();
private readonly IObservable<Message> allstreams;
public void OnCompleted<T>()
{
X stream;
if (!streams.TryGetValue(typeof(T), out stream))
{
return;
}
stream.OnCompleted();
}
public void OnError<T>(Exception error)
{
X stream;
if (!streams.TryGetValue(typeof(T), out stream))
{
return;
}
stream.OnError(error);
}
public void OnNext<T>(T value)
{
X stream;
if (!streams.TryGetValue(typeof(T), out stream))
{
return;
}
stream.OnNext(value as Message);
}
// Not entirely happy about this, but it will
// call oncompleted against all streams;
public void OnCompleted()
{
foreach (var stream in streams.Values)
{
stream.OnCompleted();
}
}
// Not entirely happy about this, but it will
// raise onerror against all streams;
public void OnError(Exception error)
{
foreach (var stream in streams.Values)
{
stream.OnError(error);
}
}
// this only works against the type that it matches;
public void OnNext(Message value)
{
Type t = value.GetType();
X stream;
if (!streams.TryGetValue(t, out stream))
{
return;
}
stream.OnNext(value);
}
public IDisposable Subscribe<T>(IObserver<T> observer) where T : Message
{
X stream;
if (!streams.TryGetValue(typeof(T), out stream))
{
stream = new X();
streams.Add(typeof(T), stream);
}
return stream.Subscribe(observer as IObserver<Message>);
}
public IDisposable Subscribe(IObserver<Message> observer)
{
return Observable.Merge<Message>(streams.Values as IEnumerable<IObservable<Message>>).Subscribe(observer);
}
public void RegisterHandler<T>(Action<T> handler) where T : Message
{
X stream;
if (!streams.TryGetValue(typeof(T), out stream))
{
stream = new X();
streams.Add(typeof(T), stream);
}
stream.Subscribe(DelegateAdjuster.CastArgument<Message, T>(x => handler(x)));
}
public void Send<T>(T command) where T : Command
{
X stream;
if (streams.TryGetValue(typeof(T), out stream))
{
stream.OnNext(command);
}
else
{
throw new InvalidOperationException(string.Format("No handler/subscription registered for {0}", typeof(T).FullName));
}
}
public void Publish<T>(T @event) where T : Event
{
X stream;
if (!streams.TryGetValue(typeof(T), out stream))
{
return;
}
stream.OnNext(@event);
}
}
public class ReplayBus : ObservableBus<ReplaySubject<Message>> { }
public class BehaviorBus : ObservableBus<BehaviorSubject<Message>> { }
public class AsyncObservableBus : ObservableBus<AsyncSubject<Message>> { }
public class ObservableBus : ObservableBus<Subject<Message>> { }
public class MXBus : IServiceBus
{
private IServiceBus Bus1;
private IServiceBus Bus2;
public MXBus(IServiceBus bus1, IServiceBus bus2)
{
Bus1 = bus1;
Bus2 = bus2;
}
public void RegisterHandler<T>(Action<T> handler) where T : Message
{
Bus1.RegisterHandler(handler);
Bus2.RegisterHandler(handler);
}
public void Send<T>(T command) where T : Command
{
Bus1.Send(command);
Bus2.Send(command);
}
public void Publish<T>(T command) where T : Event
{
Bus1.Publish(command);
Bus2.Publish(command);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment