Last active
November 6, 2015 16:19
-
-
Save rightfold/7ed6d07f824e8503e4c4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
internal sealed class ObservableAdresses : IDisposable | |
{ | |
private readonly IDisposable _connection; | |
private bool _disposed; | |
public ObservableAdresses(IPoller poller) | |
{ | |
var onEvent = Observable.Create<IAddressAndValue>(o => { | |
EventHandler<IAddressAndValue> handler = (_, e) => o.OnNext(e); | |
poller.Read += handler; | |
return Disposable.Create(() => poller.Read -= handler); | |
}).Publish(); | |
_connection = onEvent.Connect(); | |
FastAdresses = new RateAdresses(onEvent, UpdateRate.High); | |
SlowAdresses = new RateAdresses(onEvent, UpdateRate.Low); | |
} | |
public RateAdresses FastAdresses { get; } | |
public RateAdresses SlowAdresses { get; } | |
public void Dispose() | |
{ | |
if (_disposed) | |
{ | |
return; | |
} | |
_disposed = true; | |
_connection.Dispose(); | |
} | |
private void VerifyDisposed() | |
{ | |
if (_disposed) | |
{ | |
throw new ObjectDisposedException(GetType().FullName); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Collections.Immutable; | |
using System.Reactive.Linq; | |
internal sealed class RateAdresses | |
{ | |
private readonly IObservable<IAddressAndValue> _onEvent; | |
private ImmutableDictionary<IAddress, IObservable<IAddressAndValue>> _adressObservables = ImmutableDictionary<IAddress, IObservable<IAddressAndValue>>.Empty; | |
private readonly object _gate = new object(); | |
internal RateAdresses(IObservable<IAddressAndValue> onEvent, UpdateRate rate) | |
{ | |
Rate = rate; | |
_onEvent = onEvent; | |
} | |
internal event EventHandler<IEnumerable<IAddress>> AdressesChanged; | |
public UpdateRate Rate { get; } | |
internal IObservable<IAddressAndValue<T>> Observe<T>(IAddress<T> address) | |
{ | |
return Observable.Defer(() => GetOrCreateObservable(address)); | |
} | |
private IObservable<IAddressAndValue<T>> GetOrCreateObservable<T>(IAddress<T> address) | |
{ | |
IObservable<IAddressAndValue> observable; | |
if (!_adressObservables.TryGetValue(address, out observable)) | |
{ | |
bool added = false; | |
lock (_gate) | |
{ | |
if (!_adressObservables.TryGetValue(address, out observable)) | |
{ | |
added = true; | |
observable = _onEvent.OfType<IAddressAndValue>() | |
.Where(x => Equals(x.Address, address)) | |
.Publish() | |
.RefCount() | |
.Finally(() => Remove(address)); | |
_adressObservables = _adressObservables.Add(address, observable); | |
} | |
} | |
if (added) | |
{ | |
OnAdressesChanged(); | |
} | |
} | |
return observable.OfType<IAddressAndValue<T>>() | |
.Distinct(x => x.Value); | |
} | |
private void Remove(IAddress address) | |
{ | |
lock (_gate) | |
{ | |
_adressObservables = _adressObservables.Remove(address); | |
} | |
OnAdressesChanged(); | |
} | |
private void OnAdressesChanged() | |
{ | |
AdressesChanged?.Invoke(this, _adressObservables.Keys); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment