Created
February 22, 2021 22:14
-
-
Save ahancock1/ddbacedb59ab144c6dbce03a44af10da to your computer and use it in GitHub Desktop.
Reactive Rate Gate using observables to limit access to resources async
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Reactive; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
public class RateLimiter | |
{ | |
private readonly ConcurrentQueue<DateTime> _requests = | |
new ConcurrentQueue<DateTime>(); | |
private readonly AutoResetEvent _wait = | |
new AutoResetEvent(true); | |
public RateLimiter(TimeSpan interval, int limit) | |
{ | |
Interval = interval; | |
Limit = limit; | |
} | |
public TimeSpan Interval { get; set; } | |
public int Limit { get; set; } | |
public void Dispose() | |
{ | |
_wait.Dispose(); | |
} | |
private void Purge(DateTime target) | |
{ | |
while (_requests.TryPeek(out var timestamp)) | |
{ | |
if (timestamp >= target) | |
{ | |
break; | |
} | |
_requests.TryDequeue(out _); | |
} | |
} | |
private IObservable<Unit> Synchronize(IObservable<Unit> source) | |
{ | |
return Observable.Defer(() => | |
{ | |
_wait.WaitOne(); | |
return source.Finally(() => | |
_wait.Set()); | |
}); | |
} | |
public IObservable<Unit> Throttle() | |
{ | |
return Synchronize( | |
Wait().Do(_ => | |
_requests.Enqueue(DateTime.UtcNow))) | |
.SubscribeOn(TaskPoolScheduler.Default); | |
} | |
private IObservable<Unit> Wait() | |
{ | |
return Observable.FromAsync(async t => | |
{ | |
var now = DateTime.UtcNow; | |
var target = now - Interval; | |
Purge(target); | |
if (_requests.Count >= Limit) | |
{ | |
_requests.TryPeek(out var timestamp); | |
var delay = timestamp + Interval - now; | |
Console.WriteLine($"Throttle: {delay}"); | |
await Task.Delay(delay, t); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment