Written for Rx.Net 2.2.5 (released July 2014)
- Observable.Return()
- Observable.Start()
- Observable.Defer()
- IObservable.Publish()
- IObservable.Replay()
- Custom IObservable
- Custom IObservable using Observable.Create()
- Subscribing to exceptions
- IObservable.Catch()
- IObservable.Retry()
- IObservable.Select()
- IObservable.SelectMany()
- IObservable.Repeat()
- Observable.Zip()
- Observable.Switch()
- Limit concurrency with Defer/Merge
- IObservable.Delay()
- Debugging with IObservable.Do()
- Async/Await
- Gotchas
- Useful resources
Generates an observable from a single item.
var obs = Observable.Return(rnd.Next());
obs.Subscribe(Console.WriteLine); // => 42
obs.Subscribe(Console.WriteLine); // => 42
Generates an observable from a single item asynchronously. The item is generated immediately, not on subscription. Therefore, each subscription consumes the same item.
var obs = Observable.Start<int>(() => rnd.Next());
obs.Subscribe(Console.WriteLine); // => 42
obs.Subscribe(Console.WriteLine); // => 42
Generates a new observable which defers calling the passed observable factory until the observable is subscribed to. Each subscription consumes new items
var obs = Observable.Defer(() => Observable.Return(rnd.Next()));
obs.Subscribe(Console.WriteLine); // => 42
obs.Subscribe(Console.WriteLine); // => 1984
Causes all subscriptions to share the same published values.
var obs = Observable
.Defer(() => Observable.Start(() => rnd.Next()))
.Publish();
obs.Subscribe(Console.WriteLine); // => 42
obs.Subscribe(Console.WriteLine); // => 42
obs.Connect();
Subscriptions made after the Connected Observable has been connected will see all items.
var obs = Observable
.Range(1, 3)
.Replay();
obs.Connect(); // Connect before subscribing
obs.Subscribe(Console.WriteLine); // => {1,2,3}
obs.Subscribe(Console.WriteLine); // => {1,2,3}
Implement the IObservable interface with a single method. Many Subscribe() extension methods on IObservable call this method.
class IntObservable : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
observer.OnNext(42);
observer.OnCompleted();
return Disposable.Create(() => { });
}
}
new IntObservable().Subscribe(Console.WriteLine); // => 42
Observable.Create() takes a subscribe function and wraps an IObservable around it.
Observable.Create<int>(observer => {
observer.OnNext(42);
observer.OnCompleted();
return Disposable.Empty;
}).Subscribe(Console.WriteLine); // => 42
But be careful not to throw any exceptions. Don't do this:
var brokenObservable = Observable.Create<int>(observer => {
throw new Exception();
return Disposable.Empty;
});
Observable.Return(42)
.Concat(brokenObservable)
.Subscribe(onNext: Console.WriteLine, onError: Console.Error.WriteLine);
The onError handler should instead be called.
Observable.Throw<int>(new Exception())
.Subscribe(Console.WriteLine, onError: Console.Error.WriteLine); // => System.Exception: Exception of type 'System.Exception' was thrown.
Catches any exception and continues the stream with a new observable sequence.
Observable.Throw<int>(new Exception())
.Catch(Observable.Return(42))
.Subscribe(Console.WriteLine); // => 42
If an error occurs, retries the sequence the given number of times or until it completes.
bool first = true;
Observable.Create<int>(observer => {
observer.OnNext(rnd.Next());
if (first)
observer.OnError(new Exception());
else
observer.OnNext(42);
first = !first;
return Disposable.Empty;
})
.Retry(2)
.Subscribe(Console.WriteLine); // => {1984,2038,42}
Projects/maps an observable item.
Observable.Return(3)
.Select(i => i*i)
.Subscribe(Console.WriteLine); // => 9
Projects/maps an observable item into zero or more observable items, then flattens the result into a single stream.
Observable.Return(3)
.SelectMany(i => Observable.Range(1, i*i))
.Subscribe(Console.WriteLine); // => {1,2,3,4,5,6,7,8,9}
Repeats the observable stream by resubscribing to the (completed) observable a given number of times.
Observable.Create<int>(observer => {
observer.OnNext(rnd.Next());
observer.OnCompleted();
return Disposable.Empty;
})
.Repeat(3)
.Subscribe(Console.WriteLine); // => {42,1984,2038}
Joins each item from multiple sequences together waiting for the slowest sequence before moving on.
Observable.Return(42)
.SelectMany(i => {
var numeric = new object[] { 1, 2, 3 }.ToObservable();
var roman = new object[] { "i", "ii", "iii" }.ToObservable();
var alpha = new object[] { 'a', 'b', 'c' }.ToObservable();
return Observable.Zip(numeric, roman, alpha, string.Concat);
})
.Subscribe(Console.WriteLine); // => {1ia, 2iib, 3iiic}
Given an observable sequence of observable sequences - IObservable<IObservable<T>>
, will subscribe to the latest observable sequence making sure it unsubscribes first from any previous sequence.
var observable = from i in Observable.Interval(TimeSpan.FromSeconds(1))
select Observable.Create<long>(observer => {
Console.WriteLine("Subscribe to observable {0}", i);
observer.OnNext(i);
return Disposable.Create(() => Console.WriteLine("Unsubscribe from observable {0}", i));
});
observable
.Switch()
.Subscribe(i => Console.WriteLine("OnNext called from {0}\n", i));
Subscribe to observable 0
OnNext called from 0
Unsubscribe from observable 0
Subscribe to observable 1
OnNext called from 1
Unsubscribe from observable 1
Subscribe to observable 2
OnNext called from 2
Unsubscribe from observable 2
Subscribe to observable 3
OnNext called from 3
Execute no more than n tasks concurrently by deferring executing of the observable factory (asyncTask) until Merge has subscribed to it.
Func<IObservable<double>> asyncTask = () =>
{
return Task.Run(() =>
{
Thread.Sleep(1000);
return (DateTime.Now - DateTime.Today).TotalSeconds;
}).ToObservable();
};
Observable.Range(1, 30)
.Select(i => Observable.Defer(asyncTask))
.Merge(3)
.Subscribe(Console.WriteLine);
This is useful if asyncTask is an HTTP request to a REST endpoint and you don't want to over-burden the REST service. Would be better to use a back-pressure mechanism, but the current version of Rx.Net does not include back-pressure.
Note: RxJava and RxJS do suport back-pressure along with Reactive Stream specification.
Delays each item in the observable sequence by the given time.
Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Delay(TimeSpan.FromMilliseconds(200))
.Subscribe(startTime => Console.WriteLine((DateTimeOffset.UtcNow - startTime.Timestamp).TotalMilliseconds));
// 218.8716
// 202.0653
// 202.4405
// 202.3893
// 203.3459
// 202.5272
// 203.1784
// 203.3712
// 202.8387
Observable.Range(1, 3)
.Do(Console.WriteLine) // => {1,2,3}
.Select(i => i * i)
.Subscribe();
or
Observable.Range(1, 3)
.Do(x => Debugger.Break())
.Select(i => i * i)
.Subscribe();
var obs = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
obs.Subscribe(Console.WriteLine);
await obs;
and in a Console app:
var task = Task.Run(async () =>
{
var obs = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
obs.Subscribe(Console.WriteLine);
await obs;
});
task.Wait();
Observable.Zip emits items at the speed of the slowest stream. It therefore caches items from faster streams giving problems such as working set exhaustion.
var obs = Observable.Return(42)
.SelectMany(_ =>
{
var fast = Observable.Interval(TimeSpan.FromMilliseconds(1)).Select(__ => new byte[1024 * 1024]);
var slow = Observable.Interval(TimeSpan.FromSeconds(1)).Select(__ => new byte[1024 * 1024]);
return Observable.Zip(fast, slow, (a, b) => new { a, b });
})
.Subscribe(Console.WriteLine);
Within a minute should produce: An exception of type 'System.OutOfMemoryException' occurred