Skip to content

Instantly share code, notes, and snippets.

@jkone27
Forked from minhhungit/101-rx-samples.md
Created November 25, 2022 17:07
Show Gist options
  • Save jkone27/5ebc18b2b59ff72879ed7e7c70483da4 to your computer and use it in GitHub Desktop.
Save jkone27/5ebc18b2b59ff72879ed7e7c70483da4 to your computer and use it in GitHub Desktop.
101 Rx Samples in C#

101 Rx Samples in C#

This was taken from http://rxwiki.wikidot.com/101samples, because I wanted to be able to read it more comfortable with syntax highlighting.

Here's the unedited original, translated to Github Markdown glory:

101 Rx Samples - a work in progress

You!

Yes, you, the one who is still scratching their head trying to figure out this Rx thing. As you learn and explore, please feel free add your own samples here (or tweak existing ones!) Anyone can (and should!) edit this page. (edit button is at the bottom right of each page)

(and sorry for the years of spam pages there - they should be gone now. Thanks for marking them. -Rob)

Table of Contents

Asynchronous Background Operations

Start - Run Code Asynchronously

public static void StartBackgroundWork() {
    Console.WriteLine("Shows use of Start to start on a background thread:");
    var o = Observable.Start(() =>
    {
        //This starts on a background thread.
        Console.WriteLine("From background thread. Does not block main thread.");
        Console.WriteLine("Calculating...");
        Thread.Sleep(3000);
        Console.WriteLine("Background work completed.");
    }).Finally(() => Console.WriteLine("Main thread completed."));
    Console.WriteLine("\r\n\t In Main Thread...\r\n");
    o.Wait();   // Wait for completion of background operation.
}

Run a method asynchronously on demand

Execute a long-running method asynchronously. The method does not start running until there is a subscriber. The method is started every time the observable is created and subscribed, so there could be more than one running at once.

// Synchronous operation
public DataType DoLongRunningOperation(string param)
{
    ...
}

public IObservable<DataType> LongRunningOperationAsync(string param)
{
    return Observable.Create<DataType>(
        o => Observable.ToAsync<string,DataType>(DoLongRunningOperation)(param).Subscribe(o)
    );
}

CombineLatest - Parallel Execution

Merges the specified observable sequences into one observable sequence by emitting a list with the latest source elements whenever any of the observable sequences produces an element.

public async void ParallelExecutionTest()
{
    var o = Observable.CombineLatest(
        Observable.Start(() => { Console.WriteLine("Executing 1st on Thread: {0}", Thread.CurrentThread.ManagedThreadId); return "Result A"; }),
        Observable.Start(() => { Console.WriteLine("Executing 2nd on Thread: {0}", Thread.CurrentThread.ManagedThreadId); return "Result B"; }),
        Observable.Start(() => { Console.WriteLine("Executing 3rd on Thread: {0}", Thread.CurrentThread.ManagedThreadId); return "Result C"; }) 
    ).Finally(() => Console.WriteLine("Done!"));

    foreach (string r in await o.FirstAsync())
        Console.WriteLine(r);
}

Result
Executing 1st on Thread: 3
Executing 2nd on Thread: 4
Executing 3rd on Thread: 3
Done!
Result A
Result B
Result C

Note Was ForkJoin which is no longer supported. CombineLatest gives the same result.)

Create With Disposable & Scheduler - Canceling an asynchronous operation

This sample starts a background operation that generates a sequence of integers until it is canceled by the main thread. To start the background operation new the Scheduler class is used and a CancellationTokenSource is indirectly created by a Observable.Create.
Please check out the MSDN documentation on System.Threading.CancellationTokenSource to learn more about cancellation source.

IObservable<int> ob =
    Observable.Create<int>(o =>
        {
            var cancel = new CancellationDisposable(); // internally creates a new CancellationTokenSource
            NewThreadScheduler.Default.Schedule(() =>
                {
                    int i = 0;
                    for (; ; )
                    {
                        Thread.Sleep(200);  // here we do the long lasting background operation
                        if (!cancel.Token.IsCancellationRequested)    // check cancel token periodically
                            o.OnNext(i++);
                        else
                        {
                            Console.WriteLine("Aborting because cancel event was signaled!");
                            o.OnCompleted();
                            return;
                        }
                    }
                }
            );

            return cancel;
        }
    );

IDisposable subscription = ob.Subscribe(i => Console.WriteLine(i));
Console.WriteLine("Press any key to cancel");
Console.ReadKey();
subscription.Dispose();
Console.WriteLine("Press any key to quit");
Console.ReadKey();  // give background thread chance to write the cancel acknowledge message

Observation Operators

Observing an Event - Simple

class ObserveEvent_Simple
{
    public static event EventHandler SimpleEvent;
    static void Main()
    {
        // To consume SimpleEvent as an IObservable:
        var eventAsObservable = Observable.FromEventPattern(
            ev => SimpleEvent += ev,
            ev => SimpleEvent -= ev);
    }
}

Alternately, you can use EventArgs:

public static event EventHandler<EventArgs> SimpleEvent;

private static void Main(string[] args) {
    var eventAsObservable = Observable.FromEventPattern<EventArgs>
        (ev => SimpleEvent += ev,
         ev => SimpleEvent -= ev);
 }

Observing an Event - Simple (expanded)

class ObserveEvent_Simple
{
    public static event EventHandler SimpleEvent;

    private static void Main()
    {
        Console.WriteLine("Setup observable");
        // To consume SimpleEvent as an IObservable:
        var eventAsObservable = Observable.FromEventPattern(
                ev => SimpleEvent += ev,
                ev => SimpleEvent -= ev);

        // SimpleEvent is null until we subscribe
        Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");

        Console.WriteLine("Subscribe");
        //Create two event subscribers
        var s = eventAsObservable.Subscribe(args => Console.WriteLine("Received event for s subscriber"));
        var t = eventAsObservable.Subscribe(args => Console.WriteLine("Received event for t subscriber"));

        // After subscribing the event handler has been added
        Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");

        Console.WriteLine("Raise event");
        if (null != SimpleEvent)
        {
            SimpleEvent(null, EventArgs.Empty);
        }

        // Allow some time before unsubscribing or event may not happen
        Thread.Sleep(100);

        Console.WriteLine("Unsubscribe");
        s.Dispose();
        t.Dispose();

        // After unsubscribing the event handler has been removed
        Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");

        Console.ReadKey();
    }
}

Observing MouseMove in Silverlight

var mouseMove = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove");
mouseMove.ObserveOnDispatcher()
         .Subscribe(args => Debug.WriteLine(args.EventArgs.GetPosition(this)));

Note that a reference to System.Reactive.Windows.Threading is required for ObserveOnDispatcher which is in Nuget as Reactive Extensions - Silverlight Helpers.

Observing an Event - Generic

class ObserveEvent_Generic
{
    public class SomeEventArgs : EventArgs { }
    public static event EventHandler<SomeEventArgs> GenericEvent;

    static void Main()
    {
        // To consume GenericEvent as an IObservable:
        IObservable<EventPattern<SomeEventArgs>> eventAsObservable = Observable.FromEventPattern<SomeEventArgs>(
            ev => GenericEvent += ev,
            ev => GenericEvent -= ev );
    }
}

Observing an Event - Non-Generic

class ObserveEvent_NonGeneric
{
    public class SomeEventArgs : EventArgs { }
    public delegate void SomeNonGenericEventHandler(object sender, SomeEventArgs e);
    public static event SomeNonGenericEventHandler NonGenericEvent;

    static void Main()
    {
        // To consume NonGenericEvent as an IObservable, first inspect the type of EventArgs used in the second parameter of the delegate.
        // In this case, it is SomeEventArgs.  Then, use as shown below.
        IObservable<IEvent<SomeEventArgs>> eventAsObservable = Observable.FromEvent(
            (EventHandler<SomeEventArgs> ev) => new SomeNonGenericEventHandler(ev), 
            ev => NonGenericEvent += ev,
            ev => NonGenericEvent -= ev);
    }
}

Observing an Asynchronous Operation

class Observe_IAsync
{
    static void Main()
    {
        // We will use Stream's BeginRead and EndRead for this sample.
        Stream inputStream = Console.OpenStandardInput();

        // To convert an asynchronous operation that uses the IAsyncResult pattern to a function that returns an IObservable, use the following format.  
        // For the generic arguments, specify the types of the arguments of the Begin* method, up to the AsyncCallback.
        // If the End* method returns a value, append this as your final generic argument.
        var read = Observable.FromAsyncPattern<byte[], int, int, int>(inputStream.BeginRead, inputStream.EndRead);

        // Now, you can get an IObservable instead of an IAsyncResult when calling it.
        byte[] someBytes = new byte[10];
        IObservable<int> observable = read(someBytes, 0, 10);
    }
}

Be aware that while the code above formally provides an observable, this is not enough for most intended uses. For more information, see
Creating an observable sequence and
c# - What is the proper way to create an Observable which reads a stream to the end - Stack Overflow.

Observing a Generic IEnumerable

class Observe_GenericIEnumerable
{
    static void Main()
    {
        IEnumerable<int> someInts = new List<int> { 1, 2, 3, 4, 5 };

        // To convert a generic IEnumerable into an IObservable, use the ToObservable extension method.
        IObservable<int> observable = someInts.ToObservable();
    }
}

Observing a Non-Generic IEnumerable - Single Type

class Observe_NonGenericIEnumerableSingleType
{
    static void Main()
    {
        IEnumerable someInts = new object[] { 1, 2, 3, 4, 5 };

        // To convert a non-generic IEnumerable that contains elements of a single type,
        // first use Cast<> to change the non-generic enumerable into a generic enumerable,
        // then use ToObservable.
        IObservable<int> observable = someInts.Cast<int>().ToObservable();
    }
}

Observing a Non-Generic IEnumerable - Multiple Types

Observing the Passing of Time

class Observe_Time
{
    static void Main()
    {
        // To observe time passing, use the Observable.Interval function.
        // It will notify you on a time interval you specify.

        // 0 after 1s, 1 after 2s, 2 after 3s, etc.
        IObservable<long> oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
        IObservable<long> alsoOneNumberPerSecond = Observable.Interval(1000 /* milliseconds */);
    }
}

Restriction Operators

Where - Simple

class Where_Simple
{
    static void Main()
    {
        var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));

        var lowNums = from n in oneNumberPerSecond
                      where n < 5
                      select n;

        Console.WriteLine("Numbers < 5:");

        lowNums.Subscribe(lowNum =>
        {
            Console.WriteLine(lowNum);
        });

        Console.ReadKey();
    }
}

Result
Numbers < 5:
0 (after 1s)
1 (after 2s)
2 (after 3s)
3 (after 4s)
4 (after 5s)

Where - Drilldown

class Where_DrillDown
{
    class Customer
    {
        public Customer() { Orders = new ObservableCollection<Order>(); }
        public string CustomerName { get; set; }
        public string Region { get; set; }
        public ObservableCollection<Order> Orders { get; private set; }
    }

    class Order
    {
        public int OrderId { get; set; }
        public DateTimeOffset OrderDate { get; set; }
    }

    static void Main()
    {
        var customers = new ObservableCollection<Customer>();

        var customerChanges = Observable.FromEventPattern(
            (EventHandler<NotifyCollectionChangedEventArgs> ev)
               => new NotifyCollectionChangedEventHandler(ev),
            ev => customers.CollectionChanged += ev,
            ev => customers.CollectionChanged -= ev);

        var watchForNewCustomersFromWashington =
            from c in customerChanges
            where c.EventArgs.Action == NotifyCollectionChangedAction.Add
            from cus in c.EventArgs.NewItems.Cast<Customer>().ToObservable()
            where cus.Region == "WA"
            select cus;

        Console.WriteLine("New customers from Washington and their orders:");

        watchForNewCustomersFromWashington.Subscribe(cus =>
        {
            Console.WriteLine("Customer {0}:", cus.CustomerName);

            foreach (var order in cus.Orders)
            {
                Console.WriteLine("Order {0}: {1}", order.OrderId, order.OrderDate);
            }
        });

        customers.Add(new Customer
        {
            CustomerName = "Lazy K Kountry Store",
            Region = "WA",
            Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 1 } }
        });

        Thread.Sleep(1000);
        customers.Add(new Customer
        {
            CustomerName = "Joe's Food Shop",
            Region = "NY",
            Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 2 } }
        });

        Thread.Sleep(1000);
        customers.Add(new Customer
        {
            CustomerName = "Trail's Head Gourmet Provisioners",
            Region = "WA",
            Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 3 } }
        });

        Console.ReadKey();
    }
}

Result
New customers from Washington and their orders:
Customer Lazy K Kountry Store: (after 0s)
Order 1: 11/20/2009 11:52:02 AM -06:00
Customer Trail's Head Gourmet Provisioners: (after 2s)
Order 3: 11/20/2009 11:52:04 AM -06:00

Projection Operators

Select - Simple

class Select_Simple
{
    static void Main()
    {
        var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));

        var numbersTimesTwo = from n in oneNumberPerSecond
                              select n * 2;

        Console.WriteLine("Numbers * 2:");

        numbersTimesTwo.Subscribe(num =>
        {
            Console.WriteLine(num);
        });

        Console.ReadKey();
    }
}

Result
Numbers * 2:
0 (after 1s)
2 (after 2s)
4 (after 3s)
6 (after 4s)
8 (after 5s)

Select - Transformation

class Select_Transform
{
    static void Main()
    {
        var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));

        var stringsFromNumbers = from n in oneNumberPerSecond
                                 select new string('*', (int)n);

        Console.WriteLine("Strings from numbers:");

        stringsFromNumbers.Subscribe(num =>
        {
            Console.WriteLine(num);
        });

        Console.ReadKey();
    }
}

Result
Strings from numbers:
(after 0s)
(after 1s)
(after 2s)
(after 3s)
*** (after 4s)
(after 5s)
(after 6s)

Select - Indexed

class Where_Indexed
{
    class TimeIndex
    {
        public TimeIndex(int index, DateTimeOffset time)
        {
            Index = index;
            Time = time;
        }
        public int Index { get; set; }
        public DateTimeOffset Time { get; set; }
    }

    static void Main()
    {
        var clock = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select((t, index) => new TimeIndex(index, DateTimeOffset.Now));

        clock.Subscribe(timeIndex =>
        {
            Console.WriteLine(
                "Ding dong.  The time is now {0:T}.  This is event number {1}.",
                timeIndex.Time,
                timeIndex.Index);
        });

        Console.ReadKey();
    }
}

Result
Ding dong. The time is now 1:55:00 PM. This is event number 0. (after 0s)
Ding dong. The time is now 1:55:01 PM. This is event number 1. (after 1s)
Ding dong. The time is now 1:55:02 PM. This is event number 2. (after 2s)
Ding dong. The time is now 1:55:03 PM. This is event number 3. (after 3s)
Ding dong. The time is now 1:55:04 PM. This is event number 4. (after 4s)
Ding dong. The time is now 1:55:05 PM. This is event number 5. (after 5s)

Grouping

Group By - Simple

This example counts how many time you press each key as you furiously hit the keyboard. :)

class GroupBy_Simple
{
    static IEnumerable<ConsoleKeyInfo> KeyPresses()
    {
        for (; ; )
        {
            var currentKey = Console.ReadKey(true);

            if (currentKey.Key == ConsoleKey.Enter)
                yield break;
            else
                yield return currentKey;
        }
    }
    static void Main()
    {
        var timeToStop = new ManualResetEvent(false);
        var keyPresses = KeyPresses().ToObservable();

        var groupedKeyPresses =
            from k in keyPresses
            group k by k.Key into keyPressGroup
            select keyPressGroup;

        Console.WriteLine("Press Enter to stop.  Now bang that keyboard!");

        groupedKeyPresses.Subscribe(keyPressGroup =>
        {
            int numberPresses = 0;

            keyPressGroup.Subscribe(keyPress =>
            {
                Console.WriteLine(
                    "You pressed the {0} key {1} time(s)!",
                    keyPress.Key,
                    ++numberPresses);
            },
            () => timeToStop.Set());
        });

        timeToStop.WaitOne();
    }
}

Result
Depends on what you press! But something like:
Press Enter to stop. Now bang that keyboard!
You pressed the A key 1 time(s)!
You pressed the A key 2 time(s)!
You pressed the B key 1 time(s)!
You pressed the B key 2 time(s)!
You pressed the C key 1 time(s)!
You pressed the C key 2 time(s)!
You pressed the C key 3 time(s)!
You pressed the A key 3 time(s)!
You pressed the B key 3 time(s)!
You pressed the A key 4 time(s)!
You pressed the A key 5 time(s)!
You pressed the C key 4 time(s)!

Time-Related Operators

Buffer - Simple

Buffer has a strange name, but a simple concept.
Imagine an email program that checks for new mail every 5 minutes. While you can receive mail at any instant in time, you only get a batch of emails at every five minute mark.
Let's use Buffer to simulate this.

class Buffer_Simple
{
    static IEnumerable<string> EndlessBarrageOfEmail()
    {
        var random = new Random();
        var emails = new List<String> { "Here is an email!", "Another email!", "Yet another email!" };
        for (; ; )
        {
            // Return some random emails at random intervals.
            yield return emails[random.Next(emails.Count)];
            Thread.Sleep(random.Next(1000));
        }
    }
    static void Main()
    {
        var myInbox = EndlessBarrageOfEmail().ToObservable();

        // Instead of making you wait 5 minutes, we will just check every three seconds instead. :)
        var getMailEveryThreeSeconds = myInbox.Buffer(TimeSpan.FromSeconds(3)); //  Was .BufferWithTime(...

        getMailEveryThreeSeconds.Subscribe(emails =>
        {
            Console.WriteLine("You've got {0} new messages!  Here they are!", emails.Count());
            foreach (var email in emails)
            {
                Console.WriteLine("> {0}", email);
            }
            Console.WriteLine();
        });

        Console.ReadKey();
    }
}

Result
You've got 5 new messages! Here they are! (after 3s)
> Here is an email!
> Another email!
> Here is an email!
> Another email!
> Here is an email!

You've got 6 new messages! Here they are! (after 6s)
> Another email!
> Another email!
> Here is an email!
> Here is an email!
> Another email!
> Another email!

Delay - Simple

class Delay_Simple
{
    static void Main()
    {
        var oneNumberEveryFiveSeconds = Observable.Interval(TimeSpan.FromSeconds(5));

        // Instant echo
        oneNumberEveryFiveSeconds.Subscribe(num =>
        {
            Console.WriteLine(num);
        });

        // One second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).Subscribe(num =>
        {
            Console.WriteLine("...{0}...", num);
        });

        // Two second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).Subscribe(num =>
        {
            Console.WriteLine("......{0}......", num);
        });

        Console.ReadKey();
    }
}

Result
0 (after 5s)
…0… (after 6s)
……0…… (after 7s)
1 (after 10s)
…1… (after 11s)
……1…… (after 12s)

Interval - Simple

internal class Interval_Simple
{
    private static void Main()
    {
        IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));

        using (observable.Subscribe(Console.WriteLine))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

Result
0 (after 1s)
1 (after 2s)
2 (after 3s)
3 (after 4s)

Sample - Simple

internal class Sample_Simple
{
    private static void Main()
    {
        // Generate sequence of numbers, (an interval of 50 ms seems to result in approx 16 per second).
        IObservable<long> observable = Observable.Interval(TimeSpan.FromMilliseconds(50));

        // Sample the sequence every second
        using (observable.Sample(TimeSpan.FromSeconds(1)).Timestamp().Subscribe(
            x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

Result
15: 24/11/2009 15:40:45 (after 1s)
31: 24/11/2009 15:40:46 (after 2s)
47: 24/11/2009 15:40:47 (after 3s)
64: 24/11/2009 15:40:48 (after 4s)

Throttle - Simple

Throttle stops the flow of events until no more events are produced for a specified period of time. For example, if you throttle a TextChanged event of a textbox to .5 seconds, no events will be passed until the user has stopped typing for .5 seconds. This is useful in search boxes where you do not want to start a new search after every keystroke, but want to wait until the user pauses.

SearchTextChangedObservable = Observable.FromEventPattern<TextChangedEventArgs>(this.textBox, "TextChanged");
_currentSubscription = SearchTextChangedObservable.Throttle(TimeSpan.FromSeconds(.5)).ObserveOnDispatcher().Subscribe(e => this.ListItems.Add(this.textBox.Text));

Here is another example:

internal class Throttle_Simple
{
    // Generates events with interval that alternates between 500ms and 1000ms every 5 events
    static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
    {
        int i = 0;

        while(true)
        {
            if(i > 1000)
            {
                yield break;
            }
            yield return i;
            Thread.Sleep( i++ % 10 < 5 ? 500 : 1000);
        }
    }

    private static void Main()
    {
        var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
        var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));

        using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

Result
5:
6:
7:
8:
9:
15:
16:
17:
18:
19:
…etc

Interval - With TimeInterval() - Simple

internal class TimeInterval_Simple
{
    // Like TimeStamp but gives the time-interval between successive values
    private static void Main()
    {
        var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();

        using (observable.Subscribe(
            x => Console.WriteLine("{0}: {1}", x.Value, x.Interval)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

Result
0: 00:00:00.8090459 (1st value)
1: 00:00:00.7610435 (2nd value)
2: 00:00:00.7650438 (3rd value)

Interval - With TimeInterval() - Remove

internal class TimeInterval_Remove
{
    private static void Main()
    {
        // Add a time interval
        var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();

        // Remove it again
        using (observable.RemoveTimeInterval().Subscribe(Console.WriteLine))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

Result
0
1
2

Timeout - Simple

internal class Timeout_Simple
{
    private static void Main()
    {
        Console.WriteLine(DateTime.Now);

        // create a single event in 10 seconds time
        var observable = Observable.Timer(TimeSpan.FromSeconds(10)).Timestamp();

        // raise exception if no event received within 9 seconds
        var observableWithTimeout = Observable.Timeout(observable, TimeSpan.FromSeconds(9));

        using (observableWithTimeout.Subscribe(
            x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp), 
            ex => Console.WriteLine("{0} {1}", ex.Message, DateTime.Now)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

Result
02/12/2009 10:13:00
Press any key to unsubscribe
The operation has timed out. 02/12/2009 10:13:09

Timer - Simple

Observable.Interval is a simple wrapper around Observable.Timer.

internal class Timer_Simple
{
    private static void Main()
    {
        Console.WriteLine(DateTime.Now);

        var observable = Observable.Timer(TimeSpan.FromSeconds(5), 
                                                       TimeSpan.FromSeconds(1)).Timestamp();

        // or, equivalently
        // var observable = Observable.Timer(DateTime.Now + TimeSpan.FromSeconds(5), 
        //                                                TimeSpan.FromSeconds(1)).Timestamp();

        using (observable.Subscribe(
            x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

Result
02/12/2009 10:02:29
Press any key to unsubscribe
0: 02/12/2009 10:02:34(after 5s)
1: 02/12/2009 10:02:35 (after 6s)
2: 02/12/2009 10:02:36 (after 7s)

Timestamp - Simple

Adds a TimeStamp to each element using the system's local time.

internal class Timestamp_Simple
{
    private static void Main()
    {
        var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();

        using (observable.Subscribe(
            x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

Result
0: 24/11/2009 15:40:45 (after 1s)
1: 24/11/2009 15:40:46 (after 2s)
2: 24/11/2009 15:40:47 (after 3s)
3: 24/11/2009 15:40:48 (after 4s)

Timestamp - Remove

internal class Timestamp_Remove
{
    private static void Main()
    {
        // Add timestamp
        var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();

        // Remove it
        using (observable.RemoveTimestamp().Subscribe(Console.WriteLine))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

Result
0 (after 1s)
1 (after 2s)
2 (after 3s)
3 (after 4s)

Window and Joins

Window

Divides a stream into "Windows" of time. For example, 5 five second window would contain all elements pushed in that five second interval.

IObservable<long> mainSequence = Observable.Interval(TimeSpan.FromSeconds(1));
IObservable<IObservable<long>> seqWindowed = mainSequence.Window(() =>
    {
        IObservable<long> seqWindowControl = Observable.Interval(TimeSpan.FromSeconds(6));
        return seqWindowControl;
    });

seqWindowed.Subscribe(seqWindow =>
    {
        Console.WriteLine("\nA new window into the main sequence has opened: {0}\n",
                            DateTime.Now.ToString());
        seqWindow.Subscribe(x => { Console.WriteLine("Integer : {0}", x); });
    });

Console.ReadLine();

GroupJoin - Joins two streams matching by one of their attributes

var leftList = new List<string[]>();
leftList.Add(new string[] { "2013-01-01 02:00:00", "Batch1" });
leftList.Add(new string[] { "2013-01-01 03:00:00", "Batch2" });
leftList.Add(new string[] { "2013-01-01 04:00:00", "Batch3" });

var rightList = new List<string[]>();
rightList.Add(new string[] { "2013-01-01 01:00:00", "Production=2" });
rightList.Add(new string[] { "2013-01-01 02:00:00", "Production=0" });
rightList.Add(new string[] { "2013-01-01 03:00:00", "Production=3" });

var l = leftList.ToObservable();
var r = rightList.ToObservable();

var q = l.GroupJoin(r,
    _ => Observable.Never<Unit>(), // windows from each left event going on forever
    _ => Observable.Never<Unit>(), // windows from each right event going on forever
    (left, obsOfRight) => Tuple.Create(left, obsOfRight)); // create tuple of left event with observable of right events

// e is a tuple with two items, left and obsOfRight
q.Subscribe(e =>
{
    var xs = e.Item2;
    xs.Where(
     x => x[0] == e.Item1[0]) // filter only when datetime matches
     .Subscribe(
     v =>
     {
        Console.WriteLine(
           string.Format("{0},{1} and {2},{3} occur at the same time",
           e.Item1[0],
           e.Item1[1],
           v[0],
           v[1]
        ));
     });
});

Range

Generates a Range of values. Useful for testing purposes.

Range - Prints from 1 to 10.

IObservable<int> source = Observable.Range(1, 10);
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();

Generate

There are several overloads for Generate.

Generate - simple

A simple use is to replicate Interval but have the sequence stop.

internal class Generate_Simple
{
    private static void Main()
    {
        var observable =
            Observable.Generate(1, x => x < 6, x => x + 1, x => x, 
                                         x=>TimeSpan.FromSeconds(1)).Timestamp();

        using (observable.Subscribe(x => Console.WriteLine("{0}, {1}", x.Value, x.Timestamp)))
        {
            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

Result
1: 24/11/2009 15:40:45 (after 1s)
2: 24/11/2009 15:40:46 (after 2s)
3: 24/11/2009 15:40:47 (after 3s)
4: 24/11/2009 15:40:48 (after 4s)
5: 24/11/2009 15:40:49 (after 5s)

ISubject and ISubject<T1, T2>

There are several implementations for ISubject.

Ping Pong Actor Model with ISubject<T1, T2>

using System;
using System.Collections.Generic;
using System.Linq;

namespace RxPingPong
{
    /// <summary>Simple Ping Pong Actor model using Rx </summary>
    /// <remarks>
    /// You'll need to install the Reactive Extensions (Rx) for this to work.
    /// You can get the installer from <see href="http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx"/>
    /// </remarks>
    class Program
    {
        static void Main(string[] args)
        {
            var ping = new Ping();
            var pong = new Pong();

            Console.WriteLine("Press any key to stop ...");

            var pongSubscription = ping.Subscribe(pong);
            var pingSubscription = pong.Subscribe(ping);

            Console.ReadKey();

            pongSubscription.Dispose();
            pingSubscription.Dispose();

            Console.WriteLine("Ping Pong has completed.");
        }
    }

    class Ping : ISubject<Pong, Ping>
    {
        #region Implementation of IObserver<Pong>

        /// <summary>
        /// Notifies the observer of a new value in the sequence.
        /// </summary>
        public void OnNext(Pong value)
        {
            Console.WriteLine("Ping received Pong.");
        }

        /// <summary>
        /// Notifies the observer that an exception has occurred.
        /// </summary>
        public void OnError(Exception exception)
        {
            Console.WriteLine("Ping experienced an exception and had to quit playing.");
        }

        /// <summary>
        /// Notifies the observer of the end of the sequence.
        /// </summary>
        public void OnCompleted()
        {
            Console.WriteLine("Ping finished.");
        }

        #endregion

        #region Implementation of IObservable<Ping>

        /// <summary>
        /// Subscribes an observer to the observable sequence.
        /// </summary>
        public IDisposable Subscribe(IObserver<Ping> observer)
        {
            return Observable.Interval(TimeSpan.FromSeconds(2))
                .Where(n => n < 10)
                .Select(n => this)
                .Subscribe(observer);
        }

        #endregion

        #region Implementation of IDisposable

        /// <summary>
        /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
        /// </summary>
        /// <filterpriority>2</filterpriority>
        public void Dispose()
        {
            OnCompleted();
        }

        #endregion
    }

    class Pong : ISubject<Ping, Pong>
    {
        #region Implementation of IObserver<Ping>

        /// <summary>
        /// Notifies the observer of a new value in the sequence.
        /// </summary>
        public void OnNext(Ping value)
        {
            Console.WriteLine("Pong received Ping.");
        }

        /// <summary>
        /// Notifies the observer that an exception has occurred.
        /// </summary>
        public void OnError(Exception exception)
        {
            Console.WriteLine("Pong experienced an exception and had to quit playing.");
        }

        /// <summary>
        /// Notifies the observer of the end of the sequence.
        /// </summary>
        public void OnCompleted()
        {
            Console.WriteLine("Pong finished.");
        }

        #endregion

        #region Implementation of IObservable<Pong>

        /// <summary>
        /// Subscribes an observer to the observable sequence.
        /// </summary>
        public IDisposable Subscribe(IObserver<Pong> observer)
        {
            return Observable.Interval(TimeSpan.FromSeconds(1.5))
                .Where(n => n < 10)
                .Select(n => this)
                .Subscribe(observer);
        }

        #endregion

        #region Implementation of IDisposable

        /// <summary>
        /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
        /// </summary>
        /// <filterpriority>2</filterpriority>
        public void Dispose()
        {
            OnCompleted();
        }

        #endregion
    }
}

Result
1: Ping received Pong.
2: Pong received Ping.
3: Ping received Pong.
4: Pong received Ping.
5: Ping received Pong.

Combination Operators

Merge

The Merge operator combine two or more sequences. In the following example, the two streams are merged into one so that both are printed with one subscription. Also note the use of "using" to wrap the Observable, thus ensuring the subscription is Disposed.

class Merge
{
    private static IObservable<int> Xs
    {
        get { return Generate(0, new List<int> {1, 2, 2, 2, 2}); }
    }

    private static IObservable<int> Ys
    {
        get { return Generate(100, new List<int> {2, 2, 2, 2, 2}); }
    }

    private static IObservable<int> Generate(int initialValue, IList<int> intervals)
    {
        // work-around for Observable.Generate calling timeInterval before resultSelector
        intervals.Add(0); 

        return Observable.Generate(initialValue,
                                   x => x < initialValue + intervals.Count - 1,
                                   x => x + 1,
                                   x => x,
                                   x => TimeSpan.FromSeconds(intervals[x - initialValue]));
    }

    private static void Main()
    {
        Console.WriteLine("Press any key to unsubscribe");

        using (Xs.Merge(Ys).Timestamp().Subscribe(
            z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
            () => Console.WriteLine("Completed, press a key")))
        {
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

result
0: 11/12/2009 12:17:44
100: 11/12/2009 12:17:45
1: 11/12/2009 12:17:46
101: 11/12/2009 12:17:47
2: 11/12/2009 12:17:48
102: 11/12/2009 12:17:49
3: 11/12/2009 12:17:50
103: 11/12/2009 12:17:51
4: 11/12/2009 12:17:52
104: 11/12/2009 12:17:53

Publish - Sharing a subscription with multiple Observers

class Publish
{
    private static void Main()
    {
        var unshared = Observable.Range(1, 4);

        // Each subscription starts a new sequence
        unshared.Subscribe(i => Console.WriteLine("Unshared Subscription #1: " + i));
        unshared.Subscribe(i => Console.WriteLine("Unshared Subscription #2: " + i));

        Console.WriteLine();

        // By using publish the subscriptions are shared, but the sequence doesn't start until Connect() is called.
        var shared = unshared.Publish();
        shared.Subscribe(i => Console.WriteLine("Shared Subscription #1: " + i));
        shared.Subscribe(i => Console.WriteLine("Shared Subscription #2: " + i));
        shared.Connect();

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

result
Unshared Subscription #1: 1
Unshared Subscription #1: 2
Unshared Subscription #1: 3
Unshared Subscription #1: 4
Unshared Subscription #2: 1
Unshared Subscription #2: 2
Unshared Subscription #2: 3
Unshared Subscription #2: 4

Shared Subscription #1: 1
Shared Subscription #2: 1
Shared Subscription #1: 2
Shared Subscription #2: 2
Shared Subscription #1: 3
Shared Subscription #2: 3
Shared Subscription #1: 4
Shared Subscription #2: 4

Zip

class Zip
{
    // same code as above for Merge...

    private static void Main()
    {
        Console.WriteLine("Press any key to unsubscribe");

        using (Xs.Zip(Ys, (x, y) => x + y).Timestamp().Subscribe(
            z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
            () => Console.WriteLine("Completed, press a key")))
        {
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

result
100: 11/12/2009 12:17:45
102: 11/12/2009 12:17:47
104: 11/12/2009 12:17:49
106: 11/12/2009 12:17:51
108: 11/12/2009 12:17:53

CombineLatest

class CombineLatest
{
    // same code as above for Merge...

    private static void Main()
    {
        Console.WriteLine("Press any key to unsubscribe");

        using (Xs.CombineLatest(Ys, (x, y) => x + y).Timestamp().Subscribe(
            z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
            () => Console.WriteLine("Completed, press a key")))
        {
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

result
100: 11/12/2009 12:17:45
101: 11/12/2009 12:17:46
102: 11/12/2009 12:17:47
103: 11/12/2009 12:17:48
104: 11/12/2009 12:17:49
105: 11/12/2009 12:17:50
106: 11/12/2009 12:17:51
107: 11/12/2009 12:17:52
108: 11/12/2009 12:17:53

Concat - cold observable

class ConcatCold
{
    private static IObservable<int> Xs
    {
        get { return Generate(0, new List<int> {0, 1, 1}); }
    }

    private static IObservable<int> Ys
    {
        get { return Generate(100, new List<int> {1, 1, 1}); }
    }

    // same Generate() method as above for Merge...

    private static void Main()
    {
        Console.WriteLine("Press any key to unsubscribe");

        Console.WriteLine(DateTime.Now);

        using (Xs.Concat(Ys).Timestamp().Subscribe(
            z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
            () => Console.WriteLine("Completed, press a key")))
        {
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

result
0: 11/12/2009 12:17:45
1: 11/12/2009 12:17:46
2: 11/12/2009 12:17:47
100: 11/12/2009 12:17:48
101: 11/12/2009 12:17:49
102: 11/12/2009 12:17:50

Concat - hot observable

class ConcatHot
{
    private static IObservable<int> Xs
    {
        get { return Generate(0, new List<int> {0, 1, 1}); }
    }

    private static IObservable<int> Ys
    {
        get { return Generate(100, new List<int> {1, 1, 1}).Publish(); }
    }

    // same Generate() method as above for Merge...

    private static void Main()
    {
        Console.WriteLine("Press any key to unsubscribe");

        Console.WriteLine(DateTime.Now);

        using (Xs.Concat(Ys).Timestamp().Subscribe(
            z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
            () => Console.WriteLine("Completed, press a key")))
        {
            Console.ReadKey();
        }

        Console.WriteLine("Press any key to exit");
        Console.ReadKey();
    }
}

result
0: 11/12/2009 12:17:45
1: 11/12/2009 12:17:46
2: 11/12/2009 12:17:47
102: 11/12/2009 12:17:48

Make your class native to IObservable

If you are about to build new system, you could consider using just IObservable.

Use Subject as backend for IObservable

class UseSubject
{
    public class Order
    {            
        private DateTime? _paidDate;

        private readonly Subject<Order> _paidSubj = new Subject<Order>();
        public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

        public void MarkPaid(DateTime paidDate)
        {
            _paidDate = paidDate;                
            _paidSubj.OnNext(this); // Raise PAID event
        }
    }

    private static void Main()
    {
        var order = new Order();
        order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

        order.MarkPaid(DateTime.Now);
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment