Created
November 20, 2012 18:48
-
-
Save JamesTryand/4120073 to your computer and use it in GitHub Desktop.
Observable Bus
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using System.Linq; | |
| using System.Text; | |
| using Infrastructure.Domain; | |
| using System.Data.SqlClient; | |
| using System.Reactive.Subjects; | |
| using System.Reactive.Linq; | |
| namespace Infrastructure.ScratchDomain | |
| { | |
| public class ObservableBus<X> : ICommandSender, IEventPublisher, IServiceBus, IObservable<Message>, IObserver<Message> where X : ISubject<Message>, new() | |
| { | |
| private readonly Dictionary<Type, List<Action<Message>>> routes = new Dictionary<Type, List<Action<Message>>>(); | |
| private readonly Dictionary<Type, X> streams = new Dictionary<Type, X>(); | |
| private readonly IObservable<Message> allstreams; | |
| public void OnCompleted<T>() | |
| { | |
| X stream; | |
| if (!streams.TryGetValue(typeof(T), out stream)) | |
| { | |
| return; | |
| } | |
| stream.OnCompleted(); | |
| } | |
| public void OnError<T>(Exception error) | |
| { | |
| X stream; | |
| if (!streams.TryGetValue(typeof(T), out stream)) | |
| { | |
| return; | |
| } | |
| stream.OnError(error); | |
| } | |
| public void OnNext<T>(T value) | |
| { | |
| X stream; | |
| if (!streams.TryGetValue(typeof(T), out stream)) | |
| { | |
| return; | |
| } | |
| stream.OnNext(value as Message); | |
| } | |
| // Not entirely happy about this, but it will | |
| // call oncompleted against all streams; | |
| public void OnCompleted() | |
| { | |
| foreach (var stream in streams.Values) | |
| { | |
| stream.OnCompleted(); | |
| } | |
| } | |
| // Not entirely happy about this, but it will | |
| // raise onerror against all streams; | |
| public void OnError(Exception error) | |
| { | |
| foreach (var stream in streams.Values) | |
| { | |
| stream.OnError(error); | |
| } | |
| } | |
| // this only works against the type that it matches; | |
| public void OnNext(Message value) | |
| { | |
| Type t = value.GetType(); | |
| X stream; | |
| if (!streams.TryGetValue(t, out stream)) | |
| { | |
| return; | |
| } | |
| stream.OnNext(value); | |
| } | |
| public IDisposable Subscribe<T>(IObserver<T> observer) where T : Message | |
| { | |
| X stream; | |
| if (!streams.TryGetValue(typeof(T), out stream)) | |
| { | |
| stream = new X(); | |
| streams.Add(typeof(T), stream); | |
| } | |
| return stream.Subscribe(observer as IObserver<Message>); | |
| } | |
| public IDisposable Subscribe(IObserver<Message> observer) | |
| { | |
| return Observable.Merge<Message>(streams.Values as IEnumerable<IObservable<Message>>).Subscribe(observer); | |
| } | |
| public void RegisterHandler<T>(Action<T> handler) where T : Message | |
| { | |
| X stream; | |
| if (!streams.TryGetValue(typeof(T), out stream)) | |
| { | |
| stream = new X(); | |
| streams.Add(typeof(T), stream); | |
| } | |
| stream.Subscribe(DelegateAdjuster.CastArgument<Message, T>(x => handler(x))); | |
| } | |
| public void Send<T>(T command) where T : Command | |
| { | |
| X stream; | |
| if (streams.TryGetValue(typeof(T), out stream)) | |
| { | |
| stream.OnNext(command); | |
| } | |
| else | |
| { | |
| throw new InvalidOperationException(string.Format("No handler/subscription registered for {0}", typeof(T).FullName)); | |
| } | |
| } | |
| public void Publish<T>(T @event) where T : Event | |
| { | |
| X stream; | |
| if (!streams.TryGetValue(typeof(T), out stream)) | |
| { | |
| return; | |
| } | |
| stream.OnNext(@event); | |
| } | |
| } | |
| public class ReplayBus : ObservableBus<ReplaySubject<Message>> { } | |
| public class BehaviorBus : ObservableBus<BehaviorSubject<Message>> { } | |
| public class AsyncObservableBus : ObservableBus<AsyncSubject<Message>> { } | |
| public class ObservableBus : ObservableBus<Subject<Message>> { } | |
| public class MXBus : IServiceBus | |
| { | |
| private IServiceBus Bus1; | |
| private IServiceBus Bus2; | |
| public MXBus(IServiceBus bus1, IServiceBus bus2) | |
| { | |
| Bus1 = bus1; | |
| Bus2 = bus2; | |
| } | |
| public void RegisterHandler<T>(Action<T> handler) where T : Message | |
| { | |
| Bus1.RegisterHandler(handler); | |
| Bus2.RegisterHandler(handler); | |
| } | |
| public void Send<T>(T command) where T : Command | |
| { | |
| Bus1.Send(command); | |
| Bus2.Send(command); | |
| } | |
| public void Publish<T>(T command) where T : Event | |
| { | |
| Bus1.Publish(command); | |
| Bus2.Publish(command); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment