-
-
Save anaisbetts/5711743 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Reactive.Subjects; | |
using System.Reactive.Linq; | |
using System.Threading; | |
namespace MetricPushDemo | |
{ | |
class Program | |
{ | |
IObservable<Unit> PushMetricToExternalSystem(int) | |
{ | |
return someMethodThatReturnsTaskOfT().ToObservable(); | |
//or | |
return Observable.Start(() => { | |
Thread.Sleep(1000); | |
return Unit.Default; // Unit is like Void, just saying it finished | |
}); | |
} | |
static void Main(string[] args) | |
{ | |
// Scenario: main thread is generating metrics that I want to push asynchronously to an external system | |
var subj = new Subject<int>(); // my metric sequence | |
var option1 = subj | |
.ObserveOn(System.Reactive.Concurrency.Scheduler.Default) // make async | |
.Buffer(TimeSpan.FromSeconds(3)) // buffer so we don't push too frequently | |
.Where(m => m.Any()) | |
.SelectMany(m => PushMetricToExternalSystem(m)) | |
.Subscribe(m => | |
{ | |
Console.WriteLine("Option 1 pushed {0} metrics to external system", m.Count()); | |
}); | |
Console.WriteLine("Do some work"); | |
subj.OnNext(1); | |
Console.WriteLine("Do some more work"); | |
subj.OnNext(2); | |
Console.WriteLine("And some more"); | |
subj.OnNext(3); | |
Console.WriteLine("done"); | |
subj.OnCompleted(); | |
// Now when we wait, we wait until *after* the metric is pushed | |
option1.Wait(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey Paul, in this case the tasks will correctly run on the ThreadPool but there's no guarantee the subscriber must complete before the end? The app will still tear itself down before the subscriber completes.
Unfortunately option1.Wait() doesn't exist as option1 is only IDisposable