Skip to content

Instantly share code, notes, and snippets.

@wldevries
Created April 17, 2016 10:59
Show Gist options
  • Save wldevries/4521ea48212813ce81bb616e9f326062 to your computer and use it in GitHub Desktop.
Save wldevries/4521ea48212813ce81bb616e9f326062 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Reactive.Testing;
namespace PacketLengthRx
{
class Program
{
static void Main(string[] args)
{
new PacketLengthTests().Run();
Console.ReadKey();
}
}
public class PacketLengthTests : ReactiveTest
{
public void Run()
{
Subject<int> s = new Subject<int>();
var scheduler = new TestScheduler();
var xs = scheduler.CreateHotObservable<int>(
OnNext(0, 3),
OnNext(1, 4),
OnNext(2, 5),
OnNext(3, 3),
OnNext(4, 1),
OnNext(5, 0),
OnNext(6, 2),
OnNext(7, 17));
xs.Publish(ps =>
{
var boundaries = ps.Scan(new { count = -1, length = -1 },
(acc, v) =>
(acc.count >= acc.length) ?
new { count = 1, length = v } :
new { count = acc.count + 1, length = acc.length });
return ps.Window(boundaries.Where(b => b.count == b.length));
}).Subscribe(async g => {
var items = await g.ToList();
Console.WriteLine(string.Join(" ", items));
});
scheduler.Start();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment