Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Forked from NuclearFishin/gist:5711723
Last active December 18, 2015 02:29
Show Gist options
  • Save anaisbetts/5711743 to your computer and use it in GitHub Desktop.
Save anaisbetts/5711743 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive.Subjects;
using System.Reactive.Linq;
using System.Threading;
namespace MetricPushDemo
{
class Program
{
IObservable<Unit> PushMetricToExternalSystem(int)
{
return someMethodThatReturnsTaskOfT().ToObservable();
//or
return Observable.Start(() => {
Thread.Sleep(1000);
return Unit.Default; // Unit is like Void, just saying it finished
});
}
static void Main(string[] args)
{
// Scenario: main thread is generating metrics that I want to push asynchronously to an external system
var subj = new Subject<int>(); // my metric sequence
var option1 = subj
.ObserveOn(System.Reactive.Concurrency.Scheduler.Default) // make async
.Buffer(TimeSpan.FromSeconds(3)) // buffer so we don't push too frequently
.Where(m => m.Any())
.SelectMany(m => PushMetricToExternalSystem(m))
.Subscribe(m =>
{
Console.WriteLine("Option 1 pushed {0} metrics to external system", m.Count());
});
Console.WriteLine("Do some work");
subj.OnNext(1);
Console.WriteLine("Do some more work");
subj.OnNext(2);
Console.WriteLine("And some more");
subj.OnNext(3);
Console.WriteLine("done");
subj.OnCompleted();
// Now when we wait, we wait until *after* the metric is pushed
option1.Wait();
}
}
}
@NuclearFishin
Copy link

Hey Paul, in this case the tasks will correctly run on the ThreadPool but there's no guarantee the subscriber must complete before the end? The app will still tear itself down before the subscriber completes.

Unfortunately option1.Wait() doesn't exist as option1 is only IDisposable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment