Skip to content

Instantly share code, notes, and snippets.

@NuclearFishin
Created June 5, 2013 05:04
Show Gist options
  • Save NuclearFishin/5711723 to your computer and use it in GitHub Desktop.
Save NuclearFishin/5711723 to your computer and use it in GitHub Desktop.
My example of a "metric stream": I have a main thread doing work, and I want to push metrics asynchronously to an external server at a reasonable buffered interval. Seems like the perfect task for Rx :) ...but if I'm not careful, the main thread will tear-down the app before the subscriber finishes its work!
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive.Subjects;
using System.Reactive.Linq;
using System.Threading;
namespace MetricPushDemo
{
class Program
{
static void Main(string[] args)
{
// Scenario: main thread is generating metrics that I want to push asynchronously to an external system
var subj = new Subject<int>(); // my metric sequence
// Option 1: standard subscribe is not guaranteed to complete before the main app shuts down
var option1 = subj
.ObserveOn(System.Reactive.Concurrency.Scheduler.Default) // make async
.Buffer(TimeSpan.FromSeconds(3)) // buffer so we don't push too frequently
.Where(m => m.Any())
.Subscribe(m =>
{
Thread.Sleep(1000); // simulate slow round-trip
Console.WriteLine("Option 1 pushed {0} metrics to external system", m.Count());
});
// Option 2 works for me: it returns a task that we can await at the end
//var option2 = subj
// .ObserveOn(System.Reactive.Concurrency.Scheduler.Default) // make async
// .Buffer(TimeSpan.FromSeconds(3)) // buffer so we don't push too frequently
// .Where(m => m.Any())
// .ForEachAsync(m =>
// {
// Thread.Sleep(1000); // simulate slow round-trip
// Console.WriteLine("Option 2 pushed {0} metrics to external system", m.Count());
// });
Console.WriteLine("Do some work");
subj.OnNext(1);
Console.WriteLine("Do some more work");
subj.OnNext(2);
Console.WriteLine("And some more");
subj.OnNext(3);
Console.WriteLine("done");
subj.OnCompleted();
//option2.Wait(); // wait for option 2 to complete
// I welcome your thoughts! :)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment