-
-
Save chgeuer/b76d5711afb593611809eed8f227cfb1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace RxCheckProviderCount | |
{ | |
using System; | |
using System.Collections.Generic; | |
using System.Collections.ObjectModel; | |
using System.Linq; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
class Message | |
{ | |
public Message(string providerId, string requestId, int payload) | |
{ | |
(ProviderID, RequestID, Payload) = (providerId, requestId, payload); | |
} | |
public string ProviderID { get; set; } | |
public string RequestID { get; set; } | |
public int Payload{ get; set; } | |
public override string ToString() => $"{this.ProviderID} {this.RequestID} {this.Payload}"; | |
} | |
class Program | |
{ | |
static void Main() | |
{ | |
var (p1, p2, p3, p4) = ("provider1", "provider2", "provider3", "provider4"); | |
var (r1, r2) = ("req1", "req2"); | |
var timeout = TimeSpan.FromMilliseconds(800); | |
var observable = | |
new Message(p1, r1, 1).EmitIn(TimeSpan.FromMilliseconds(50)) | |
.And(new Message(p1, r2, 2)).In( TimeSpan.FromMilliseconds(60)) | |
.And(new Message(p2, r1, 3)).In( TimeSpan.FromMilliseconds(70)) | |
.And(new Message(p3, r1, 4)).In( TimeSpan.FromMilliseconds(80)) | |
.And(new Message(p4, r1, 5)).In( TimeSpan.FromMilliseconds(500)) | |
; | |
var counter = new ProviderCounter<string>(expectedNumberOfProviders: 3); | |
var responseMustBeReadyBy = DateTimeOffset.Now.Add(timeout); | |
var waitedLongEnough = Observable.Timer(dueTime: responseMustBeReadyBy).Select(_ => 1); | |
var whenExpiredOrAllProvidersReplied = waitedLongEnough.Merge(counter.Observable); | |
var x = observable | |
.Where(t => t.RequestID == r1) | |
.Do(x => Console.WriteLine($"Saw a {x}")) | |
.Do(x => counter.Add(x.ProviderID)) | |
.TakeUntil(whenExpiredOrAllProvidersReplied) | |
.Subscribe( | |
onNext: i => Console.WriteLine($"Process {i.ProviderID} {i.RequestID} {i.Payload} "), | |
onCompleted: () => Console.WriteLine("Done")); | |
bool thingFired = false; | |
var c = new ProviderCounter<int>(3); | |
c.Observable.Subscribe(onNext: _ => thingFired = true); | |
if (thingFired) throw new Exception(); | |
c.Add(1); if (thingFired) throw new Exception(); | |
c.Add(2); if (thingFired) throw new Exception(); | |
c.Add(2); if (thingFired) throw new Exception(); | |
c.Add(2); if (thingFired) throw new Exception(); | |
c.Add(1); if (thingFired) throw new Exception(); | |
c.Add(2); if (thingFired) throw new Exception(); | |
c.Add(3); if (!thingFired) throw new Exception(); | |
Console.WriteLine("We're octoawesome"); | |
Console.ReadLine(); | |
} | |
} | |
public class ProviderCounter<T> | |
{ | |
private readonly int expectedCount; | |
private readonly HashSet<T> coll = new HashSet<T>(); | |
private readonly object _lock = new object(); | |
private readonly Subject<int> subject; | |
public IObservable<int> Observable { get; } | |
public ProviderCounter(int expectedNumberOfProviders) | |
{ | |
this.expectedCount = expectedNumberOfProviders; | |
this.subject = new Subject<int>(); | |
this.Observable = this.subject.AsObservable(); | |
} | |
public void Add(T t) | |
{ | |
this.coll.Add(t); | |
int count = this.coll.Count; | |
if (count >= expectedCount) | |
{ | |
this.subject.OnNext(count); | |
this.subject.OnCompleted(); | |
} | |
} | |
} | |
public static class ReactiveExtensionExtensions | |
{ | |
/// <summary> | |
/// Emits item t after specified time | |
/// </summary> | |
/// <typeparam name="T"></typeparam> | |
/// <param name="t"></param> | |
/// <param name="dueTime"></param> | |
/// <returns></returns> | |
public static IObservable<T> EmitIn<T>(this T t, TimeSpan dueTime) | |
=> Observable.Timer(dueTime: dueTime).Select(_ => t); | |
/// <summary> | |
/// concatination of operations | |
/// </summary> | |
/// <typeparam name="T"></typeparam> | |
/// <param name="sequence"></param> | |
/// <param name="t"></param> | |
/// <returns></returns> | |
public static (IObservable<T>, T) And<T>(this IObservable<T> sequence, T t) | |
=> (sequence, t); | |
/// <summary> | |
/// Emit within chaining | |
/// </summary> | |
/// <typeparam name="T"></typeparam> | |
/// <param name="tup"></param> | |
/// <param name="dueTime"></param> | |
/// <returns></returns> | |
public static IObservable<T> In<T>(this ValueTuple<IObservable<T>, T> tup, TimeSpan dueTime) | |
=> tup.Item1.Merge(tup.Item2.EmitIn(dueTime)); | |
/// <summary> | |
/// Take While Inclusive extension | |
/// https://stackoverflow.com/questions/14697658/rx-observable-takewhile-checks-condition-before-each-element-but-i-need-to-perfo | |
/// </summary> | |
/// <typeparam name="T"></typeparam> | |
/// <param name="source"></param> | |
/// <param name="predicate"> predicate</param> | |
/// <returns></returns> | |
public static IObservable<T> TakeWhileInclusive<T>( | |
this IObservable<T> source, Func<T, bool> predicate) | |
{ | |
return source.Publish(co => co.TakeWhile(predicate) | |
.Merge(co.SkipWhile(predicate).Take(1))); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment