This was taken from http://rxwiki.wikidot.com/101samples, because I wanted to be able to read it more comfortable with syntax highlighting.
Here's the unedited original, translated to Github Markdown glory:
You!
Yes, you, the one who is still scratching their head trying to figure out this Rx thing. As you learn and explore, please feel free add your own samples here (or tweak existing ones!) Anyone can (and should!) edit this page. (edit button is at the bottom right of each page)
(and sorry for the years of spam pages there - they should be gone now. Thanks for marking them. -Rob)
- Asynchronous Background Operations
- Observation Operators
- Observing an Event - Simple
- Observing an Event - Simple (expanded)
- Observing MouseMove in Silverlight
- Observing an Event - Generic
- Observing an Event - Non-Generic
- Observing an Asynchronous Operation
- Observing a Generic IEnumerable
- Observing a Non-Generic IEnumerable - Single Type
- Observing a Non-Generic IEnumerable - Multiple Types
- Observing the Passing of Time
- Restriction Operators
- Projection Operators
- Grouping
- Time-Related Operators
- Window and Joins
- Range
- Generate
- ISubject and ISubject<T1, T2>
- Combination Operators
- Make your class native to IObservable
public static void StartBackgroundWork() {
Console.WriteLine("Shows use of Start to start on a background thread:");
var o = Observable.Start(() =>
{
//This starts on a background thread.
Console.WriteLine("From background thread. Does not block main thread.");
Console.WriteLine("Calculating...");
Thread.Sleep(3000);
Console.WriteLine("Background work completed.");
}).Finally(() => Console.WriteLine("Main thread completed."));
Console.WriteLine("\r\n\t In Main Thread...\r\n");
o.Wait(); // Wait for completion of background operation.
}
Execute a long-running method asynchronously. The method does not start running until there is a subscriber. The method is started every time the observable is created and subscribed, so there could be more than one running at once.
// Synchronous operation
public DataType DoLongRunningOperation(string param)
{
...
}
public IObservable<DataType> LongRunningOperationAsync(string param)
{
return Observable.Create<DataType>(
o => Observable.ToAsync<string,DataType>(DoLongRunningOperation)(param).Subscribe(o)
);
}
Merges the specified observable sequences into one observable sequence by emitting a list with the latest source elements whenever any of the observable sequences produces an element.
public async void ParallelExecutionTest()
{
var o = Observable.CombineLatest(
Observable.Start(() => { Console.WriteLine("Executing 1st on Thread: {0}", Thread.CurrentThread.ManagedThreadId); return "Result A"; }),
Observable.Start(() => { Console.WriteLine("Executing 2nd on Thread: {0}", Thread.CurrentThread.ManagedThreadId); return "Result B"; }),
Observable.Start(() => { Console.WriteLine("Executing 3rd on Thread: {0}", Thread.CurrentThread.ManagedThreadId); return "Result C"; })
).Finally(() => Console.WriteLine("Done!"));
foreach (string r in await o.FirstAsync())
Console.WriteLine(r);
}
Result
Executing 1st on Thread: 3
Executing 2nd on Thread: 4
Executing 3rd on Thread: 3
Done!
Result A
Result B
Result C
Note Was ForkJoin which is no longer supported. CombineLatest gives the same result.)
This sample starts a background operation that generates a sequence of integers until it is canceled by the main thread. To start the background operation new the Scheduler class is used and a CancellationTokenSource is indirectly created by a Observable.Create.
Please check out the MSDN documentation on System.Threading.CancellationTokenSource to learn more about cancellation source.
IObservable<int> ob =
Observable.Create<int>(o =>
{
var cancel = new CancellationDisposable(); // internally creates a new CancellationTokenSource
NewThreadScheduler.Default.Schedule(() =>
{
int i = 0;
for (; ; )
{
Thread.Sleep(200); // here we do the long lasting background operation
if (!cancel.Token.IsCancellationRequested) // check cancel token periodically
o.OnNext(i++);
else
{
Console.WriteLine("Aborting because cancel event was signaled!");
o.OnCompleted();
return;
}
}
}
);
return cancel;
}
);
IDisposable subscription = ob.Subscribe(i => Console.WriteLine(i));
Console.WriteLine("Press any key to cancel");
Console.ReadKey();
subscription.Dispose();
Console.WriteLine("Press any key to quit");
Console.ReadKey(); // give background thread chance to write the cancel acknowledge message
class ObserveEvent_Simple
{
public static event EventHandler SimpleEvent;
static void Main()
{
// To consume SimpleEvent as an IObservable:
var eventAsObservable = Observable.FromEventPattern(
ev => SimpleEvent += ev,
ev => SimpleEvent -= ev);
}
}
Alternately, you can use EventArgs:
public static event EventHandler<EventArgs> SimpleEvent;
private static void Main(string[] args) {
var eventAsObservable = Observable.FromEventPattern<EventArgs>
(ev => SimpleEvent += ev,
ev => SimpleEvent -= ev);
}
class ObserveEvent_Simple
{
public static event EventHandler SimpleEvent;
private static void Main()
{
Console.WriteLine("Setup observable");
// To consume SimpleEvent as an IObservable:
var eventAsObservable = Observable.FromEventPattern(
ev => SimpleEvent += ev,
ev => SimpleEvent -= ev);
// SimpleEvent is null until we subscribe
Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
Console.WriteLine("Subscribe");
//Create two event subscribers
var s = eventAsObservable.Subscribe(args => Console.WriteLine("Received event for s subscriber"));
var t = eventAsObservable.Subscribe(args => Console.WriteLine("Received event for t subscriber"));
// After subscribing the event handler has been added
Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
Console.WriteLine("Raise event");
if (null != SimpleEvent)
{
SimpleEvent(null, EventArgs.Empty);
}
// Allow some time before unsubscribing or event may not happen
Thread.Sleep(100);
Console.WriteLine("Unsubscribe");
s.Dispose();
t.Dispose();
// After unsubscribing the event handler has been removed
Console.WriteLine(SimpleEvent == null ? "SimpleEvent == null" : "SimpleEvent != null");
Console.ReadKey();
}
}
var mouseMove = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove");
mouseMove.ObserveOnDispatcher()
.Subscribe(args => Debug.WriteLine(args.EventArgs.GetPosition(this)));
Note that a reference to System.Reactive.Windows.Threading is required for ObserveOnDispatcher which is in Nuget as Reactive Extensions - Silverlight Helpers.
class ObserveEvent_Generic
{
public class SomeEventArgs : EventArgs { }
public static event EventHandler<SomeEventArgs> GenericEvent;
static void Main()
{
// To consume GenericEvent as an IObservable:
IObservable<EventPattern<SomeEventArgs>> eventAsObservable = Observable.FromEventPattern<SomeEventArgs>(
ev => GenericEvent += ev,
ev => GenericEvent -= ev );
}
}
class ObserveEvent_NonGeneric
{
public class SomeEventArgs : EventArgs { }
public delegate void SomeNonGenericEventHandler(object sender, SomeEventArgs e);
public static event SomeNonGenericEventHandler NonGenericEvent;
static void Main()
{
// To consume NonGenericEvent as an IObservable, first inspect the type of EventArgs used in the second parameter of the delegate.
// In this case, it is SomeEventArgs. Then, use as shown below.
IObservable<IEvent<SomeEventArgs>> eventAsObservable = Observable.FromEvent(
(EventHandler<SomeEventArgs> ev) => new SomeNonGenericEventHandler(ev),
ev => NonGenericEvent += ev,
ev => NonGenericEvent -= ev);
}
}
class Observe_IAsync
{
static void Main()
{
// We will use Stream's BeginRead and EndRead for this sample.
Stream inputStream = Console.OpenStandardInput();
// To convert an asynchronous operation that uses the IAsyncResult pattern to a function that returns an IObservable, use the following format.
// For the generic arguments, specify the types of the arguments of the Begin* method, up to the AsyncCallback.
// If the End* method returns a value, append this as your final generic argument.
var read = Observable.FromAsyncPattern<byte[], int, int, int>(inputStream.BeginRead, inputStream.EndRead);
// Now, you can get an IObservable instead of an IAsyncResult when calling it.
byte[] someBytes = new byte[10];
IObservable<int> observable = read(someBytes, 0, 10);
}
}
Be aware that while the code above formally provides an observable, this is not enough for most intended uses. For more information, see
Creating an observable sequence and
c# - What is the proper way to create an Observable which reads a stream to the end - Stack Overflow.
class Observe_GenericIEnumerable
{
static void Main()
{
IEnumerable<int> someInts = new List<int> { 1, 2, 3, 4, 5 };
// To convert a generic IEnumerable into an IObservable, use the ToObservable extension method.
IObservable<int> observable = someInts.ToObservable();
}
}
class Observe_NonGenericIEnumerableSingleType
{
static void Main()
{
IEnumerable someInts = new object[] { 1, 2, 3, 4, 5 };
// To convert a non-generic IEnumerable that contains elements of a single type,
// first use Cast<> to change the non-generic enumerable into a generic enumerable,
// then use ToObservable.
IObservable<int> observable = someInts.Cast<int>().ToObservable();
}
}
class Observe_Time
{
static void Main()
{
// To observe time passing, use the Observable.Interval function.
// It will notify you on a time interval you specify.
// 0 after 1s, 1 after 2s, 2 after 3s, etc.
IObservable<long> oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
IObservable<long> alsoOneNumberPerSecond = Observable.Interval(1000 /* milliseconds */);
}
}
class Where_Simple
{
static void Main()
{
var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var lowNums = from n in oneNumberPerSecond
where n < 5
select n;
Console.WriteLine("Numbers < 5:");
lowNums.Subscribe(lowNum =>
{
Console.WriteLine(lowNum);
});
Console.ReadKey();
}
}
Result
Numbers < 5:
0 (after 1s)
1 (after 2s)
2 (after 3s)
3 (after 4s)
4 (after 5s)
class Where_DrillDown
{
class Customer
{
public Customer() { Orders = new ObservableCollection<Order>(); }
public string CustomerName { get; set; }
public string Region { get; set; }
public ObservableCollection<Order> Orders { get; private set; }
}
class Order
{
public int OrderId { get; set; }
public DateTimeOffset OrderDate { get; set; }
}
static void Main()
{
var customers = new ObservableCollection<Customer>();
var customerChanges = Observable.FromEventPattern(
(EventHandler<NotifyCollectionChangedEventArgs> ev)
=> new NotifyCollectionChangedEventHandler(ev),
ev => customers.CollectionChanged += ev,
ev => customers.CollectionChanged -= ev);
var watchForNewCustomersFromWashington =
from c in customerChanges
where c.EventArgs.Action == NotifyCollectionChangedAction.Add
from cus in c.EventArgs.NewItems.Cast<Customer>().ToObservable()
where cus.Region == "WA"
select cus;
Console.WriteLine("New customers from Washington and their orders:");
watchForNewCustomersFromWashington.Subscribe(cus =>
{
Console.WriteLine("Customer {0}:", cus.CustomerName);
foreach (var order in cus.Orders)
{
Console.WriteLine("Order {0}: {1}", order.OrderId, order.OrderDate);
}
});
customers.Add(new Customer
{
CustomerName = "Lazy K Kountry Store",
Region = "WA",
Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 1 } }
});
Thread.Sleep(1000);
customers.Add(new Customer
{
CustomerName = "Joe's Food Shop",
Region = "NY",
Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 2 } }
});
Thread.Sleep(1000);
customers.Add(new Customer
{
CustomerName = "Trail's Head Gourmet Provisioners",
Region = "WA",
Orders = { new Order { OrderDate = DateTimeOffset.Now, OrderId = 3 } }
});
Console.ReadKey();
}
}
Result
New customers from Washington and their orders:
Customer Lazy K Kountry Store: (after 0s)
Order 1: 11/20/2009 11:52:02 AM -06:00
Customer Trail's Head Gourmet Provisioners: (after 2s)
Order 3: 11/20/2009 11:52:04 AM -06:00
class Select_Simple
{
static void Main()
{
var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var numbersTimesTwo = from n in oneNumberPerSecond
select n * 2;
Console.WriteLine("Numbers * 2:");
numbersTimesTwo.Subscribe(num =>
{
Console.WriteLine(num);
});
Console.ReadKey();
}
}
Result
Numbers * 2:
0 (after 1s)
2 (after 2s)
4 (after 3s)
6 (after 4s)
8 (after 5s)
class Select_Transform
{
static void Main()
{
var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));
var stringsFromNumbers = from n in oneNumberPerSecond
select new string('*', (int)n);
Console.WriteLine("Strings from numbers:");
stringsFromNumbers.Subscribe(num =>
{
Console.WriteLine(num);
});
Console.ReadKey();
}
}
Result
Strings from numbers:
(after 0s)
(after 1s)
(after 2s)
(after 3s)
*** (after 4s)
(after 5s)
(after 6s)
class Where_Indexed
{
class TimeIndex
{
public TimeIndex(int index, DateTimeOffset time)
{
Index = index;
Time = time;
}
public int Index { get; set; }
public DateTimeOffset Time { get; set; }
}
static void Main()
{
var clock = Observable.Interval(TimeSpan.FromSeconds(1))
.Select((t, index) => new TimeIndex(index, DateTimeOffset.Now));
clock.Subscribe(timeIndex =>
{
Console.WriteLine(
"Ding dong. The time is now {0:T}. This is event number {1}.",
timeIndex.Time,
timeIndex.Index);
});
Console.ReadKey();
}
}
Result
Ding dong. The time is now 1:55:00 PM. This is event number 0. (after 0s)
Ding dong. The time is now 1:55:01 PM. This is event number 1. (after 1s)
Ding dong. The time is now 1:55:02 PM. This is event number 2. (after 2s)
Ding dong. The time is now 1:55:03 PM. This is event number 3. (after 3s)
Ding dong. The time is now 1:55:04 PM. This is event number 4. (after 4s)
Ding dong. The time is now 1:55:05 PM. This is event number 5. (after 5s)
This example counts how many time you press each key as you furiously hit the keyboard. :)
class GroupBy_Simple
{
static IEnumerable<ConsoleKeyInfo> KeyPresses()
{
for (; ; )
{
var currentKey = Console.ReadKey(true);
if (currentKey.Key == ConsoleKey.Enter)
yield break;
else
yield return currentKey;
}
}
static void Main()
{
var timeToStop = new ManualResetEvent(false);
var keyPresses = KeyPresses().ToObservable();
var groupedKeyPresses =
from k in keyPresses
group k by k.Key into keyPressGroup
select keyPressGroup;
Console.WriteLine("Press Enter to stop. Now bang that keyboard!");
groupedKeyPresses.Subscribe(keyPressGroup =>
{
int numberPresses = 0;
keyPressGroup.Subscribe(keyPress =>
{
Console.WriteLine(
"You pressed the {0} key {1} time(s)!",
keyPress.Key,
++numberPresses);
},
() => timeToStop.Set());
});
timeToStop.WaitOne();
}
}
Result
Depends on what you press! But something like:
Press Enter to stop. Now bang that keyboard!
You pressed the A key 1 time(s)!
You pressed the A key 2 time(s)!
You pressed the B key 1 time(s)!
You pressed the B key 2 time(s)!
You pressed the C key 1 time(s)!
You pressed the C key 2 time(s)!
You pressed the C key 3 time(s)!
You pressed the A key 3 time(s)!
You pressed the B key 3 time(s)!
You pressed the A key 4 time(s)!
You pressed the A key 5 time(s)!
You pressed the C key 4 time(s)!
Buffer has a strange name, but a simple concept.
Imagine an email program that checks for new mail every 5 minutes. While you can receive mail at any instant in time, you only get a batch of emails at every five minute mark.
Let's use Buffer to simulate this.
class Buffer_Simple
{
static IEnumerable<string> EndlessBarrageOfEmail()
{
var random = new Random();
var emails = new List<String> { "Here is an email!", "Another email!", "Yet another email!" };
for (; ; )
{
// Return some random emails at random intervals.
yield return emails[random.Next(emails.Count)];
Thread.Sleep(random.Next(1000));
}
}
static void Main()
{
var myInbox = EndlessBarrageOfEmail().ToObservable();
// Instead of making you wait 5 minutes, we will just check every three seconds instead. :)
var getMailEveryThreeSeconds = myInbox.Buffer(TimeSpan.FromSeconds(3)); // Was .BufferWithTime(...
getMailEveryThreeSeconds.Subscribe(emails =>
{
Console.WriteLine("You've got {0} new messages! Here they are!", emails.Count());
foreach (var email in emails)
{
Console.WriteLine("> {0}", email);
}
Console.WriteLine();
});
Console.ReadKey();
}
}
Result
You've got 5 new messages! Here they are! (after 3s)
> Here is an email!
> Another email!
> Here is an email!
> Another email!
> Here is an email!
You've got 6 new messages! Here they are! (after 6s)
> Another email!
> Another email!
> Here is an email!
> Here is an email!
> Another email!
> Another email!
class Delay_Simple
{
static void Main()
{
var oneNumberEveryFiveSeconds = Observable.Interval(TimeSpan.FromSeconds(5));
// Instant echo
oneNumberEveryFiveSeconds.Subscribe(num =>
{
Console.WriteLine(num);
});
// One second delay
oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).Subscribe(num =>
{
Console.WriteLine("...{0}...", num);
});
// Two second delay
oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).Subscribe(num =>
{
Console.WriteLine("......{0}......", num);
});
Console.ReadKey();
}
}
Result
0 (after 5s)
…0… (after 6s)
……0…… (after 7s)
1 (after 10s)
…1… (after 11s)
……1…… (after 12s)
internal class Interval_Simple
{
private static void Main()
{
IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));
using (observable.Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
0 (after 1s)
1 (after 2s)
2 (after 3s)
3 (after 4s)
…
internal class Sample_Simple
{
private static void Main()
{
// Generate sequence of numbers, (an interval of 50 ms seems to result in approx 16 per second).
IObservable<long> observable = Observable.Interval(TimeSpan.FromMilliseconds(50));
// Sample the sequence every second
using (observable.Sample(TimeSpan.FromSeconds(1)).Timestamp().Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
15: 24/11/2009 15:40:45 (after 1s)
31: 24/11/2009 15:40:46 (after 2s)
47: 24/11/2009 15:40:47 (after 3s)
64: 24/11/2009 15:40:48 (after 4s)
…
Throttle stops the flow of events until no more events are produced for a specified period of time. For example, if you throttle a TextChanged event of a textbox to .5 seconds, no events will be passed until the user has stopped typing for .5 seconds. This is useful in search boxes where you do not want to start a new search after every keystroke, but want to wait until the user pauses.
SearchTextChangedObservable = Observable.FromEventPattern<TextChangedEventArgs>(this.textBox, "TextChanged");
_currentSubscription = SearchTextChangedObservable.Throttle(TimeSpan.FromSeconds(.5)).ObserveOnDispatcher().Subscribe(e => this.ListItems.Add(this.textBox.Text));
Here is another example:
internal class Throttle_Simple
{
// Generates events with interval that alternates between 500ms and 1000ms every 5 events
static IEnumerable<int> GenerateAlternatingFastAndSlowEvents()
{
int i = 0;
while(true)
{
if(i > 1000)
{
yield break;
}
yield return i;
Thread.Sleep( i++ % 10 < 5 ? 500 : 1000);
}
}
private static void Main()
{
var observable = GenerateAlternatingFastAndSlowEvents().ToObservable().Timestamp();
var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750));
using (throttled.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
5:
6:
7:
8:
9:
15:
16:
17:
18:
19:
…etc
internal class TimeInterval_Simple
{
// Like TimeStamp but gives the time-interval between successive values
private static void Main()
{
var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();
using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Interval)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
0: 00:00:00.8090459 (1st value)
1: 00:00:00.7610435 (2nd value)
2: 00:00:00.7650438 (3rd value)
…
internal class TimeInterval_Remove
{
private static void Main()
{
// Add a time interval
var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();
// Remove it again
using (observable.RemoveTimeInterval().Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
0
1
2
…
internal class Timeout_Simple
{
private static void Main()
{
Console.WriteLine(DateTime.Now);
// create a single event in 10 seconds time
var observable = Observable.Timer(TimeSpan.FromSeconds(10)).Timestamp();
// raise exception if no event received within 9 seconds
var observableWithTimeout = Observable.Timeout(observable, TimeSpan.FromSeconds(9));
using (observableWithTimeout.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp),
ex => Console.WriteLine("{0} {1}", ex.Message, DateTime.Now)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
02/12/2009 10:13:00
Press any key to unsubscribe
The operation has timed out. 02/12/2009 10:13:09
…
Observable.Interval is a simple wrapper around Observable.Timer.
internal class Timer_Simple
{
private static void Main()
{
Console.WriteLine(DateTime.Now);
var observable = Observable.Timer(TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(1)).Timestamp();
// or, equivalently
// var observable = Observable.Timer(DateTime.Now + TimeSpan.FromSeconds(5),
// TimeSpan.FromSeconds(1)).Timestamp();
using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
02/12/2009 10:02:29
Press any key to unsubscribe
0: 02/12/2009 10:02:34(after 5s)
1: 02/12/2009 10:02:35 (after 6s)
2: 02/12/2009 10:02:36 (after 7s)
…
Adds a TimeStamp to each element using the system's local time.
internal class Timestamp_Simple
{
private static void Main()
{
var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();
using (observable.Subscribe(
x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
0: 24/11/2009 15:40:45 (after 1s)
1: 24/11/2009 15:40:46 (after 2s)
2: 24/11/2009 15:40:47 (after 3s)
3: 24/11/2009 15:40:48 (after 4s)
…
internal class Timestamp_Remove
{
private static void Main()
{
// Add timestamp
var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();
// Remove it
using (observable.RemoveTimestamp().Subscribe(Console.WriteLine))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
0 (after 1s)
1 (after 2s)
2 (after 3s)
3 (after 4s)
…
Divides a stream into "Windows" of time. For example, 5 five second window would contain all elements pushed in that five second interval.
IObservable<long> mainSequence = Observable.Interval(TimeSpan.FromSeconds(1));
IObservable<IObservable<long>> seqWindowed = mainSequence.Window(() =>
{
IObservable<long> seqWindowControl = Observable.Interval(TimeSpan.FromSeconds(6));
return seqWindowControl;
});
seqWindowed.Subscribe(seqWindow =>
{
Console.WriteLine("\nA new window into the main sequence has opened: {0}\n",
DateTime.Now.ToString());
seqWindow.Subscribe(x => { Console.WriteLine("Integer : {0}", x); });
});
Console.ReadLine();
var leftList = new List<string[]>();
leftList.Add(new string[] { "2013-01-01 02:00:00", "Batch1" });
leftList.Add(new string[] { "2013-01-01 03:00:00", "Batch2" });
leftList.Add(new string[] { "2013-01-01 04:00:00", "Batch3" });
var rightList = new List<string[]>();
rightList.Add(new string[] { "2013-01-01 01:00:00", "Production=2" });
rightList.Add(new string[] { "2013-01-01 02:00:00", "Production=0" });
rightList.Add(new string[] { "2013-01-01 03:00:00", "Production=3" });
var l = leftList.ToObservable();
var r = rightList.ToObservable();
var q = l.GroupJoin(r,
_ => Observable.Never<Unit>(), // windows from each left event going on forever
_ => Observable.Never<Unit>(), // windows from each right event going on forever
(left, obsOfRight) => Tuple.Create(left, obsOfRight)); // create tuple of left event with observable of right events
// e is a tuple with two items, left and obsOfRight
q.Subscribe(e =>
{
var xs = e.Item2;
xs.Where(
x => x[0] == e.Item1[0]) // filter only when datetime matches
.Subscribe(
v =>
{
Console.WriteLine(
string.Format("{0},{1} and {2},{3} occur at the same time",
e.Item1[0],
e.Item1[1],
v[0],
v[1]
));
});
});
Generates a Range of values. Useful for testing purposes.
IObservable<int> source = Observable.Range(1, 10);
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
There are several overloads for Generate.
A simple use is to replicate Interval but have the sequence stop.
internal class Generate_Simple
{
private static void Main()
{
var observable =
Observable.Generate(1, x => x < 6, x => x + 1, x => x,
x=>TimeSpan.FromSeconds(1)).Timestamp();
using (observable.Subscribe(x => Console.WriteLine("{0}, {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
Result
1: 24/11/2009 15:40:45 (after 1s)
2: 24/11/2009 15:40:46 (after 2s)
3: 24/11/2009 15:40:47 (after 3s)
4: 24/11/2009 15:40:48 (after 4s)
5: 24/11/2009 15:40:49 (after 5s)
There are several implementations for ISubject.
using System;
using System.Collections.Generic;
using System.Linq;
namespace RxPingPong
{
/// <summary>Simple Ping Pong Actor model using Rx </summary>
/// <remarks>
/// You'll need to install the Reactive Extensions (Rx) for this to work.
/// You can get the installer from <see href="http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx"/>
/// </remarks>
class Program
{
static void Main(string[] args)
{
var ping = new Ping();
var pong = new Pong();
Console.WriteLine("Press any key to stop ...");
var pongSubscription = ping.Subscribe(pong);
var pingSubscription = pong.Subscribe(ping);
Console.ReadKey();
pongSubscription.Dispose();
pingSubscription.Dispose();
Console.WriteLine("Ping Pong has completed.");
}
}
class Ping : ISubject<Pong, Ping>
{
#region Implementation of IObserver<Pong>
/// <summary>
/// Notifies the observer of a new value in the sequence.
/// </summary>
public void OnNext(Pong value)
{
Console.WriteLine("Ping received Pong.");
}
/// <summary>
/// Notifies the observer that an exception has occurred.
/// </summary>
public void OnError(Exception exception)
{
Console.WriteLine("Ping experienced an exception and had to quit playing.");
}
/// <summary>
/// Notifies the observer of the end of the sequence.
/// </summary>
public void OnCompleted()
{
Console.WriteLine("Ping finished.");
}
#endregion
#region Implementation of IObservable<Ping>
/// <summary>
/// Subscribes an observer to the observable sequence.
/// </summary>
public IDisposable Subscribe(IObserver<Ping> observer)
{
return Observable.Interval(TimeSpan.FromSeconds(2))
.Where(n => n < 10)
.Select(n => this)
.Subscribe(observer);
}
#endregion
#region Implementation of IDisposable
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
OnCompleted();
}
#endregion
}
class Pong : ISubject<Ping, Pong>
{
#region Implementation of IObserver<Ping>
/// <summary>
/// Notifies the observer of a new value in the sequence.
/// </summary>
public void OnNext(Ping value)
{
Console.WriteLine("Pong received Ping.");
}
/// <summary>
/// Notifies the observer that an exception has occurred.
/// </summary>
public void OnError(Exception exception)
{
Console.WriteLine("Pong experienced an exception and had to quit playing.");
}
/// <summary>
/// Notifies the observer of the end of the sequence.
/// </summary>
public void OnCompleted()
{
Console.WriteLine("Pong finished.");
}
#endregion
#region Implementation of IObservable<Pong>
/// <summary>
/// Subscribes an observer to the observable sequence.
/// </summary>
public IDisposable Subscribe(IObserver<Pong> observer)
{
return Observable.Interval(TimeSpan.FromSeconds(1.5))
.Where(n => n < 10)
.Select(n => this)
.Subscribe(observer);
}
#endregion
#region Implementation of IDisposable
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
OnCompleted();
}
#endregion
}
}
Result
1: Ping received Pong.
2: Pong received Ping.
3: Ping received Pong.
4: Pong received Ping.
5: Ping received Pong.
The Merge operator combine two or more sequences. In the following example, the two streams are merged into one so that both are printed with one subscription. Also note the use of "using" to wrap the Observable, thus ensuring the subscription is Disposed.
class Merge
{
private static IObservable<int> Xs
{
get { return Generate(0, new List<int> {1, 2, 2, 2, 2}); }
}
private static IObservable<int> Ys
{
get { return Generate(100, new List<int> {2, 2, 2, 2, 2}); }
}
private static IObservable<int> Generate(int initialValue, IList<int> intervals)
{
// work-around for Observable.Generate calling timeInterval before resultSelector
intervals.Add(0);
return Observable.Generate(initialValue,
x => x < initialValue + intervals.Count - 1,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(intervals[x - initialValue]));
}
private static void Main()
{
Console.WriteLine("Press any key to unsubscribe");
using (Xs.Merge(Ys).Timestamp().Subscribe(
z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
0: 11/12/2009 12:17:44
100: 11/12/2009 12:17:45
1: 11/12/2009 12:17:46
101: 11/12/2009 12:17:47
2: 11/12/2009 12:17:48
102: 11/12/2009 12:17:49
3: 11/12/2009 12:17:50
103: 11/12/2009 12:17:51
4: 11/12/2009 12:17:52
104: 11/12/2009 12:17:53
class Publish
{
private static void Main()
{
var unshared = Observable.Range(1, 4);
// Each subscription starts a new sequence
unshared.Subscribe(i => Console.WriteLine("Unshared Subscription #1: " + i));
unshared.Subscribe(i => Console.WriteLine("Unshared Subscription #2: " + i));
Console.WriteLine();
// By using publish the subscriptions are shared, but the sequence doesn't start until Connect() is called.
var shared = unshared.Publish();
shared.Subscribe(i => Console.WriteLine("Shared Subscription #1: " + i));
shared.Subscribe(i => Console.WriteLine("Shared Subscription #2: " + i));
shared.Connect();
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
Unshared Subscription #1: 1
Unshared Subscription #1: 2
Unshared Subscription #1: 3
Unshared Subscription #1: 4
Unshared Subscription #2: 1
Unshared Subscription #2: 2
Unshared Subscription #2: 3
Unshared Subscription #2: 4
Shared Subscription #1: 1
Shared Subscription #2: 1
Shared Subscription #1: 2
Shared Subscription #2: 2
Shared Subscription #1: 3
Shared Subscription #2: 3
Shared Subscription #1: 4
Shared Subscription #2: 4
class Zip
{
// same code as above for Merge...
private static void Main()
{
Console.WriteLine("Press any key to unsubscribe");
using (Xs.Zip(Ys, (x, y) => x + y).Timestamp().Subscribe(
z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
100: 11/12/2009 12:17:45
102: 11/12/2009 12:17:47
104: 11/12/2009 12:17:49
106: 11/12/2009 12:17:51
108: 11/12/2009 12:17:53
class CombineLatest
{
// same code as above for Merge...
private static void Main()
{
Console.WriteLine("Press any key to unsubscribe");
using (Xs.CombineLatest(Ys, (x, y) => x + y).Timestamp().Subscribe(
z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
100: 11/12/2009 12:17:45
101: 11/12/2009 12:17:46
102: 11/12/2009 12:17:47
103: 11/12/2009 12:17:48
104: 11/12/2009 12:17:49
105: 11/12/2009 12:17:50
106: 11/12/2009 12:17:51
107: 11/12/2009 12:17:52
108: 11/12/2009 12:17:53
class ConcatCold
{
private static IObservable<int> Xs
{
get { return Generate(0, new List<int> {0, 1, 1}); }
}
private static IObservable<int> Ys
{
get { return Generate(100, new List<int> {1, 1, 1}); }
}
// same Generate() method as above for Merge...
private static void Main()
{
Console.WriteLine("Press any key to unsubscribe");
Console.WriteLine(DateTime.Now);
using (Xs.Concat(Ys).Timestamp().Subscribe(
z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
0: 11/12/2009 12:17:45
1: 11/12/2009 12:17:46
2: 11/12/2009 12:17:47
100: 11/12/2009 12:17:48
101: 11/12/2009 12:17:49
102: 11/12/2009 12:17:50
class ConcatHot
{
private static IObservable<int> Xs
{
get { return Generate(0, new List<int> {0, 1, 1}); }
}
private static IObservable<int> Ys
{
get { return Generate(100, new List<int> {1, 1, 1}).Publish(); }
}
// same Generate() method as above for Merge...
private static void Main()
{
Console.WriteLine("Press any key to unsubscribe");
Console.WriteLine(DateTime.Now);
using (Xs.Concat(Ys).Timestamp().Subscribe(
z => Console.WriteLine("{0,3}: {1}", z.Value, z.Timestamp),
() => Console.WriteLine("Completed, press a key")))
{
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
result
0: 11/12/2009 12:17:45
1: 11/12/2009 12:17:46
2: 11/12/2009 12:17:47
102: 11/12/2009 12:17:48
If you are about to build new system, you could consider using just IObservable.
class UseSubject
{
public class Order
{
private DateTime? _paidDate;
private readonly Subject<Order> _paidSubj = new Subject<Order>();
public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }
public void MarkPaid(DateTime paidDate)
{
_paidDate = paidDate;
_paidSubj.OnNext(this); // Raise PAID event
}
}
private static void Main()
{
var order = new Order();
order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe
order.MarkPaid(DateTime.Now);
}
}
@omnibs I realize that this is not your documentation, but I thought I'd ping you just in case you're interested in this discussion...
I've been delving into various aspects of the rx extensions. The thing that is driving me crazy is that when you look at the code, you would think that Observable.Create would create just one long running task. It might start when the subscription starts, but you'd think that you could subscribe as many times as you like without spawning more and more tasks.
However, if you look at this sample, you see that it actually creates a new IObserver each time the Subscribe method is called. So if ten things subscribe, there will be ten loops running. You can try it our yourself but subscribing more than once.
I'm scratching my head on why the library is designed like this. I'm trying to do this with the Rx extensions. But, I just cannot figure out how to achieve the same result with the extensions.
If you have any ideas, I would really appreciate them.
Thanks