Last active
September 15, 2015 11:58
-
-
Save Dorus/92d9d5336eacdea77d8a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Reactive; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
namespace Server { | |
static class OverflowQueue_ { | |
/// <summary> | |
/// Collect each element in the source sequence and enqueue it on the resulting | |
/// sequence. If the callback on the resulting sequence does not complete before | |
/// new elements are received, enqueue up to maximal N elements, and process them | |
/// within the TimeSpan, decided by filter. | |
/// </summary> | |
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> | |
/// <param name="source">Source sequence</param> | |
/// <param name="filter">The filter function that decide if an element is still relevant.</param> | |
/// <param name="scheduler">Scheduler to notify observers on.</param> | |
/// <returns>The resulting sequence.</returns> | |
public static IObservable<TSource> OverflowQueue<TSource>(this IObservable<TSource> source, Func<TimeSpan, int, Boolean> filter, IScheduler scheduler) { | |
return Observable.Create<TSource>(o => { | |
var gate = new Object(); | |
var queue = new ConcurrentQueue<Timestamped<TSource>>(); | |
var running = false; | |
var completed = false; | |
Exception error = null; | |
return source.Timestamp(scheduler).Do(_ => { | |
Timestamped<TSource> result; | |
if (queue.TryPeek(out result)) { | |
var time = scheduler.Now.Subtract(result.Timestamp); | |
if (!filter(time, queue.Count)) { | |
// remove oldest element if needed. | |
queue.TryDequeue(out result); | |
} | |
} | |
}) | |
.Subscribe(el => { | |
queue.Enqueue(el); | |
lock (gate) { | |
if (running) { | |
return; | |
} | |
running = true; | |
} | |
Action<Action> runQueue = (self) => { | |
Timestamped<TSource> result; | |
while (queue.TryDequeue(out result)) { | |
var time = scheduler.Now.Subtract(result.Timestamp); | |
if (filter(time, queue.Count)) { | |
o.OnNext(result.Value); | |
} | |
} | |
lock (gate) { | |
if (queue.Count == 0) { | |
if (error != null) { | |
o.OnError(error); | |
} | |
if (completed) { | |
o.OnCompleted(); | |
} | |
running = false; | |
return; | |
} | |
} | |
self(); | |
}; | |
scheduler.Schedule(runQueue); | |
}, err => scheduler.Schedule(() => { | |
lock (gate) { | |
if (running) { | |
error = err; | |
return; | |
} | |
} | |
o.OnError(err); | |
}), () => scheduler.Schedule(() => { | |
lock (gate) { | |
if (running) { | |
completed = true; | |
return; | |
} | |
} | |
o.OnCompleted(); | |
})); | |
}); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment