Skip to content

Instantly share code, notes, and snippets.

@agross
Created November 25, 2014 19:43
Show Gist options
  • Save agross/c53f5c54fd8c6e163daf to your computer and use it in GitHub Desktop.
Save agross/c53f5c54fd8c6e163daf to your computer and use it in GitHub Desktop.
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace RxWithhold
{
class Program
{
[STAThread]
static void Main(string[] args)
{
var data = Observable.Interval(TimeSpan.FromSeconds(1));
var connection = new Subject<bool>();
var connected = false;
connection.Do(x => connected = x).Subscribe();
data
.Window(() => connection)
.Subscribe(window =>
{
Console.Write("Starting new window: ");
if (connected == false)
{
Console.WriteLine("Buffering until online");
window
.Buffer(() => connection.Where(x => x))
.Subscribe(buffered => buffered
.ToObservable()
.Subscribe(ProcessNotification));
}
else
{
Console.WriteLine("Forwarding notifications");
window.Subscribe(ProcessNotification);
}
});
connection
.Subscribe(x => Console.WriteLine("Connection changed: {0}", x));
var enabled = true;
while (true)
{
Console.ReadKey();
connection.OnNext(enabled);
enabled = !enabled;
}
}
static void ProcessNotification(long notification)
{
Console.WriteLine(notification);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment