Created
June 19, 2018 14:38
-
-
Save kstrauss/99362404f0afff6e461f2ca79525f972 to your computer and use it in GitHub Desktop.
returns an observable that provides the first in a window and lets you know when the window closes and what was in the window
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void Main() | |
{ | |
var closer = new Subject<Tuple<WindowState, IList<long>>>(); | |
closer.Select(c => { | |
if (c.Item1 == WindowState.Open) | |
{ | |
return $"Should email, with an event of {c.Item2.Single()}"; | |
} | |
else | |
{ | |
return $"should do nothing but will write a checkpoint for {c.Item2.Max()}, because we had {String.Join(",", c.Item2.Select(i=>i.ToString()))}"; | |
} | |
}).Timestamp().Dump(); | |
var outer = Observer.Create<IObservable<long>>(o => | |
{ | |
Console.WriteLine("Got start of window"); | |
var ws = new WindowInfo(); | |
//closer.OnNext(Tuple.Create<WindowState, IList<long>>(WindowState.Open, new List<long>())); | |
var inner = Observer.Create<long>(ob => { | |
ws.Add(ob); | |
if (ws.GetInternal().Count() <=1) // only want to signal on the first one | |
closer.OnNext( Tuple.Create<WindowState, IList<long>>(WindowState.Open, new List<long>() { ob})); | |
}, e1 => Console.Error.WriteLine("Inner Exception"), | |
// on complete() | |
() => { | |
Console.WriteLine("Closed Window"); | |
var closerValue = Tuple.Create<WindowState,IList<long>>(WindowState.Closed, ws.GetInternal()); | |
closer.OnNext(closerValue); | |
}); | |
var innerSub = o.Subscribe(inner); | |
}, e=>Console.Error.WriteLine("Got Exception"), ()=>Console.WriteLine("Done")); | |
var timer = Observable.Interval(TimeSpan.FromSeconds(1)) | |
.Do(o => Console.WriteLine("new value in window")) // just for debugging | |
.Window(5) | |
.Subscribe(outer); | |
Console.ReadLine(); | |
} | |
public enum WindowState{ | |
Open, | |
Closed | |
} | |
public class WindowInfo{ | |
protected List<long> internalValues = new List<long>(); | |
public void Add(long l){ | |
internalValues.Add(l); | |
} | |
public IList<long> GetInternal(){ | |
return internalValues; | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment