Skip to content

Instantly share code, notes, and snippets.

@chgeuer
Last active April 23, 2020 16:24
Show Gist options
  • Save chgeuer/b76d5711afb593611809eed8f227cfb1 to your computer and use it in GitHub Desktop.
Save chgeuer/b76d5711afb593611809eed8f227cfb1 to your computer and use it in GitHub Desktop.
namespace RxCheckProviderCount
{
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
class Message
{
public Message(string providerId, string requestId, int payload)
{
(ProviderID, RequestID, Payload) = (providerId, requestId, payload);
}
public string ProviderID { get; set; }
public string RequestID { get; set; }
public int Payload{ get; set; }
public override string ToString() => $"{this.ProviderID} {this.RequestID} {this.Payload}";
}
class Program
{
static void Main()
{
var (p1, p2, p3, p4) = ("provider1", "provider2", "provider3", "provider4");
var (r1, r2) = ("req1", "req2");
var timeout = TimeSpan.FromMilliseconds(800);
var observable =
new Message(p1, r1, 1).EmitIn(TimeSpan.FromMilliseconds(50))
.And(new Message(p1, r2, 2)).In( TimeSpan.FromMilliseconds(60))
.And(new Message(p2, r1, 3)).In( TimeSpan.FromMilliseconds(70))
.And(new Message(p3, r1, 4)).In( TimeSpan.FromMilliseconds(80))
.And(new Message(p4, r1, 5)).In( TimeSpan.FromMilliseconds(500))
;
var counter = new ProviderCounter<string>(expectedNumberOfProviders: 3);
var responseMustBeReadyBy = DateTimeOffset.Now.Add(timeout);
var waitedLongEnough = Observable.Timer(dueTime: responseMustBeReadyBy).Select(_ => 1);
var whenExpiredOrAllProvidersReplied = waitedLongEnough.Merge(counter.Observable);
var x = observable
.Where(t => t.RequestID == r1)
.Do(x => Console.WriteLine($"Saw a {x}"))
.Do(x => counter.Add(x.ProviderID))
.TakeUntil(whenExpiredOrAllProvidersReplied)
.Subscribe(
onNext: i => Console.WriteLine($"Process {i.ProviderID} {i.RequestID} {i.Payload} "),
onCompleted: () => Console.WriteLine("Done"));
bool thingFired = false;
var c = new ProviderCounter<int>(3);
c.Observable.Subscribe(onNext: _ => thingFired = true);
if (thingFired) throw new Exception();
c.Add(1); if (thingFired) throw new Exception();
c.Add(2); if (thingFired) throw new Exception();
c.Add(2); if (thingFired) throw new Exception();
c.Add(2); if (thingFired) throw new Exception();
c.Add(1); if (thingFired) throw new Exception();
c.Add(2); if (thingFired) throw new Exception();
c.Add(3); if (!thingFired) throw new Exception();
Console.WriteLine("We're octoawesome");
Console.ReadLine();
}
}
public class ProviderCounter<T>
{
private readonly int expectedCount;
private readonly HashSet<T> coll = new HashSet<T>();
private readonly object _lock = new object();
private readonly Subject<int> subject;
public IObservable<int> Observable { get; }
public ProviderCounter(int expectedNumberOfProviders)
{
this.expectedCount = expectedNumberOfProviders;
this.subject = new Subject<int>();
this.Observable = this.subject.AsObservable();
}
public void Add(T t)
{
this.coll.Add(t);
int count = this.coll.Count;
if (count >= expectedCount)
{
this.subject.OnNext(count);
this.subject.OnCompleted();
}
}
}
public static class ReactiveExtensionExtensions
{
/// <summary>
/// Emits item t after specified time
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="t"></param>
/// <param name="dueTime"></param>
/// <returns></returns>
public static IObservable<T> EmitIn<T>(this T t, TimeSpan dueTime)
=> Observable.Timer(dueTime: dueTime).Select(_ => t);
/// <summary>
/// concatination of operations
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="sequence"></param>
/// <param name="t"></param>
/// <returns></returns>
public static (IObservable<T>, T) And<T>(this IObservable<T> sequence, T t)
=> (sequence, t);
/// <summary>
/// Emit within chaining
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="tup"></param>
/// <param name="dueTime"></param>
/// <returns></returns>
public static IObservable<T> In<T>(this ValueTuple<IObservable<T>, T> tup, TimeSpan dueTime)
=> tup.Item1.Merge(tup.Item2.EmitIn(dueTime));
/// <summary>
/// Take While Inclusive extension
/// https://stackoverflow.com/questions/14697658/rx-observable-takewhile-checks-condition-before-each-element-but-i-need-to-perfo
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source"></param>
/// <param name="predicate"> predicate</param>
/// <returns></returns>
public static IObservable<T> TakeWhileInclusive<T>(
this IObservable<T> source, Func<T, bool> predicate)
{
return source.Publish(co => co.TakeWhile(predicate)
.Merge(co.SkipWhile(predicate).Take(1)));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment