Skip to content

Instantly share code, notes, and snippets.

@rbirkby
Last active December 2, 2021 14:03
Show Gist options
  • Save rbirkby/5d20485a984159d51c14 to your computer and use it in GitHub Desktop.
Save rbirkby/5d20485a984159d51c14 to your computer and use it in GitHub Desktop.
Reactive Extensions notes

Notes on Reactive Extensions .Net

Written for Rx.Net 2.2.5 (released July 2014)

Observable.Return()

Generates an observable from a single item.

var obs = Observable.Return(rnd.Next());

obs.Subscribe(Console.WriteLine); // => 42
obs.Subscribe(Console.WriteLine); // => 42

Observable.Start()

Generates an observable from a single item asynchronously. The item is generated immediately, not on subscription. Therefore, each subscription consumes the same item.

var obs = Observable.Start<int>(() => rnd.Next());

obs.Subscribe(Console.WriteLine); // => 42
obs.Subscribe(Console.WriteLine); // => 42

Observable.Defer()

Generates a new observable which defers calling the passed observable factory until the observable is subscribed to. Each subscription consumes new items

var obs = Observable.Defer(() => Observable.Return(rnd.Next()));

obs.Subscribe(Console.WriteLine); // => 42
obs.Subscribe(Console.WriteLine); // => 1984

IObservable.Publish()

Causes all subscriptions to share the same published values.

var obs = Observable
    .Defer(() => Observable.Start(() => rnd.Next()))
    .Publish();

obs.Subscribe(Console.WriteLine); // => 42
obs.Subscribe(Console.WriteLine); // => 42
obs.Connect();

IObservable.Replay()

Subscriptions made after the Connected Observable has been connected will see all items.

var obs = Observable
    .Range(1, 3)
    .Replay();

obs.Connect(); // Connect before subscribing
obs.Subscribe(Console.WriteLine); // => {1,2,3}
obs.Subscribe(Console.WriteLine); // => {1,2,3}

Custom IObservable

Implement the IObservable interface with a single method. Many Subscribe() extension methods on IObservable call this method.

class IntObservable : IObservable<int>
{
    public IDisposable Subscribe(IObserver<int> observer)
    {
        observer.OnNext(42);
        observer.OnCompleted();
        return Disposable.Create(() => { });
    }
}


new IntObservable().Subscribe(Console.WriteLine); // => 42

Custom IObservable using Observable.Create()

Observable.Create() takes a subscribe function and wraps an IObservable around it.

Observable.Create<int>(observer => {
    observer.OnNext(42);
    observer.OnCompleted();
    return Disposable.Empty;
}).Subscribe(Console.WriteLine); // => 42

But be careful not to throw any exceptions. Don't do this:

var brokenObservable = Observable.Create<int>(observer => {
    throw new Exception();
    return Disposable.Empty;
});

Observable.Return(42)
    .Concat(brokenObservable)
    .Subscribe(onNext: Console.WriteLine, onError: Console.Error.WriteLine);

The onError handler should instead be called.

Subscribing to exceptions

Observable.Throw<int>(new Exception())
    .Subscribe(Console.WriteLine, onError: Console.Error.WriteLine); // => System.Exception: Exception of type 'System.Exception' was thrown.

IObservable.Catch()

Catches any exception and continues the stream with a new observable sequence.

Observable.Throw<int>(new Exception())
    .Catch(Observable.Return(42))
    .Subscribe(Console.WriteLine); // => 42

IObservable.Retry()

If an error occurs, retries the sequence the given number of times or until it completes.

bool first = true;

Observable.Create<int>(observer => {
        observer.OnNext(rnd.Next());
    
        if (first)
            observer.OnError(new Exception());
        else 
            observer.OnNext(42);
        first = !first;   

        return Disposable.Empty;
    })
    .Retry(2)
    .Subscribe(Console.WriteLine); // => {1984,2038,42}

IObservable.Select()

Projects/maps an observable item.

Observable.Return(3)
    .Select(i => i*i)
    .Subscribe(Console.WriteLine); // => 9

IObservable.SelectMany()

Projects/maps an observable item into zero or more observable items, then flattens the result into a single stream.

Observable.Return(3)
    .SelectMany(i => Observable.Range(1, i*i))
    .Subscribe(Console.WriteLine); // => {1,2,3,4,5,6,7,8,9}

IObservable.Repeat()

Repeats the observable stream by resubscribing to the (completed) observable a given number of times.

Observable.Create<int>(observer => {
        observer.OnNext(rnd.Next());
        observer.OnCompleted();
        return Disposable.Empty;
    })
    .Repeat(3)
    .Subscribe(Console.WriteLine); // => {42,1984,2038}

Observable.Zip()

Joins each item from multiple sequences together waiting for the slowest sequence before moving on.

Observable.Return(42)
    .SelectMany(i => {
        var numeric = new object[] { 1, 2, 3 }.ToObservable();
        var roman = new object[] { "i", "ii", "iii" }.ToObservable();
        var alpha = new object[] { 'a', 'b', 'c' }.ToObservable();

        return Observable.Zip(numeric, roman, alpha, string.Concat);
    })
    .Subscribe(Console.WriteLine); // => {1ia, 2iib, 3iiic}

Observable.Switch()

Given an observable sequence of observable sequences - IObservable<IObservable<T>>, will subscribe to the latest observable sequence making sure it unsubscribes first from any previous sequence.

var observable = from i in Observable.Interval(TimeSpan.FromSeconds(1))
                 select Observable.Create<long>(observer => {
                        Console.WriteLine("Subscribe to observable {0}", i);
                        observer.OnNext(i);
                        return Disposable.Create(() => Console.WriteLine("Unsubscribe from observable {0}", i));
                 });

observable
    .Switch()
    .Subscribe(i => Console.WriteLine("OnNext called from {0}\n", i));
Subscribe to observable 0
OnNext called from 0

Unsubscribe from observable 0
Subscribe to observable 1
OnNext called from 1

Unsubscribe from observable 1
Subscribe to observable 2
OnNext called from 2

Unsubscribe from observable 2
Subscribe to observable 3
OnNext called from 3

Limit Concurrency with Defer/Merge

Execute no more than n tasks concurrently by deferring executing of the observable factory (asyncTask) until Merge has subscribed to it.

Func<IObservable<double>> asyncTask = () =>
    {
        return Task.Run(() =>
        {
            Thread.Sleep(1000);
            return (DateTime.Now - DateTime.Today).TotalSeconds;
        }).ToObservable();
    };

Observable.Range(1, 30)
    .Select(i => Observable.Defer(asyncTask))
    .Merge(3)
    .Subscribe(Console.WriteLine);

This is useful if asyncTask is an HTTP request to a REST endpoint and you don't want to over-burden the REST service. Would be better to use a back-pressure mechanism, but the current version of Rx.Net does not include back-pressure.

Note: RxJava and RxJS do suport back-pressure along with Reactive Stream specification.

IObservable.Delay()

Delays each item in the observable sequence by the given time.

Observable.Interval(TimeSpan.FromSeconds(1))
    .Timestamp()
    .Delay(TimeSpan.FromMilliseconds(200))
    .Subscribe(startTime => Console.WriteLine((DateTimeOffset.UtcNow - startTime.Timestamp).TotalMilliseconds));

// 218.8716
// 202.0653
// 202.4405
// 202.3893
// 203.3459
// 202.5272
// 203.1784
// 203.3712
// 202.8387

Debugging with IObservable.Do()

Observable.Range(1, 3)
    .Do(Console.WriteLine) // => {1,2,3}
    .Select(i => i * i)
    .Subscribe();

or

Observable.Range(1, 3)
    .Do(x => Debugger.Break())
    .Select(i => i * i)
    .Subscribe();

Async/Await

var obs = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
obs.Subscribe(Console.WriteLine);
await obs;

and in a Console app:

var task = Task.Run(async () =>
{
    var obs = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);

    obs.Subscribe(Console.WriteLine);
    await obs;
});

task.Wait();

Gotchas

Observable.Zip emits items at the speed of the slowest stream. It therefore caches items from faster streams giving problems such as working set exhaustion.

var obs = Observable.Return(42)
    .SelectMany(_ =>
    {
        var fast = Observable.Interval(TimeSpan.FromMilliseconds(1)).Select(__ => new byte[1024 * 1024]);
        var slow = Observable.Interval(TimeSpan.FromSeconds(1)).Select(__ => new byte[1024 * 1024]);

        return Observable.Zip(fast, slow, (a, b) => new { a, b });
    })
    .Subscribe(Console.WriteLine);

Within a minute should produce: An exception of type 'System.OutOfMemoryException' occurred

Useful Resources

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment