Created
June 5, 2013 05:04
-
-
Save NuclearFishin/5711723 to your computer and use it in GitHub Desktop.
My example of a "metric stream": I have a main thread doing work, and I want to push metrics asynchronously to an external server at a reasonable buffered interval. Seems like the perfect task for Rx :) ...but if I'm not careful, the main thread will tear-down the app before the subscriber finishes its work!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Reactive.Subjects; | |
using System.Reactive.Linq; | |
using System.Threading; | |
namespace MetricPushDemo | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
// Scenario: main thread is generating metrics that I want to push asynchronously to an external system | |
var subj = new Subject<int>(); // my metric sequence | |
// Option 1: standard subscribe is not guaranteed to complete before the main app shuts down | |
var option1 = subj | |
.ObserveOn(System.Reactive.Concurrency.Scheduler.Default) // make async | |
.Buffer(TimeSpan.FromSeconds(3)) // buffer so we don't push too frequently | |
.Where(m => m.Any()) | |
.Subscribe(m => | |
{ | |
Thread.Sleep(1000); // simulate slow round-trip | |
Console.WriteLine("Option 1 pushed {0} metrics to external system", m.Count()); | |
}); | |
// Option 2 works for me: it returns a task that we can await at the end | |
//var option2 = subj | |
// .ObserveOn(System.Reactive.Concurrency.Scheduler.Default) // make async | |
// .Buffer(TimeSpan.FromSeconds(3)) // buffer so we don't push too frequently | |
// .Where(m => m.Any()) | |
// .ForEachAsync(m => | |
// { | |
// Thread.Sleep(1000); // simulate slow round-trip | |
// Console.WriteLine("Option 2 pushed {0} metrics to external system", m.Count()); | |
// }); | |
Console.WriteLine("Do some work"); | |
subj.OnNext(1); | |
Console.WriteLine("Do some more work"); | |
subj.OnNext(2); | |
Console.WriteLine("And some more"); | |
subj.OnNext(3); | |
Console.WriteLine("done"); | |
subj.OnCompleted(); | |
//option2.Wait(); // wait for option 2 to complete | |
// I welcome your thoughts! :) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment