Skip to content

Instantly share code, notes, and snippets.

@itn3000
Last active April 27, 2022 04:32
Show Gist options
  • Save itn3000/b0393d041ca0834b7aa4ee89acc9e873 to your computer and use it in GitHub Desktop.
Save itn3000/b0393d041ca0834b7aa4ee89acc9e873 to your computer and use it in GitHub Desktop.
memo about System.Threading.RateLimiting(Japanese)

共通事項

  • 実装コードは https://github.com/dotnet/runtime/tree/main/src/libraries/System.Threading.RateLimiting/src
  • nugetページは https://www.nuget.org/packages/System.Threading.RateLimiting
  • 7.0pre1をベースに記述しているので、7.0リリース時点でまた変更点があるかもしれない
  • 同期的にはRateLimiter.Acquire(int permitCount)、非同期ではRateLimiter.WaitAsync(int permitCount)でRateLimitLeaseを取得する
  • RateLimitLeaseはIDisposableなので、必ずDispose(using)する
  • 返ってくるRateLimitLeaseは失敗の結果が入っている可能性がある
  • 許可されたかどうかはRateLimitLease.IsAquiredで判断する
  • 成功した場合、Disposeがされた時点でまたRateLimiterに空きが確保される
    • Disposeを忘れると空きが確保されなくなる
  • 個別の実装によるが、RateLimitLease.GetMetadataで値を取得できる場合がある
    • デフォルトで予約されているものはSystem.Threading.RateLimiting.MetadataNameのstaticインスタンスとして取得できる
  • ベースクラスのRateLimiterがIDisposableとIAsyncDisposableの両方を実装しているので、usingとawait using両方とも可能
  • RateLimiterで実装必須なのは以下
    • int GetAvailablePermits()
    • RateLimitLease AcquireCore(int permitCount)
    • ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken)
  • RateLimiterを実装する場合、RateLimitLeaseも実装が必要
  • QueueProcessingOrderはWaitAsyncで待った場合にFIFOかLIFOで取り出されるかを設定する
    • OldestFirstがFIFO、NewestFirstがLIFO
    • それぞれ派生クラスで使っているだけなので、RateLimiterを拡張させる時に必ずしも使う必要は無い
  • 基本的にリエントラントではないという事に注意

TokenBucketLimiterについて

  • アルゴリズムについて: https://ja.wikipedia.org/wiki/%E3%83%88%E3%83%BC%E3%82%AF%E3%83%B3%E3%83%90%E3%82%B1%E3%83%83%E3%83%88
  • 要は bytes/sec みたいな流量制限を行うことができる
  • b=tokenLimit
  • S=replenishmentPeriod
  • n=permitCount
  • r=tokensPerPeriod
  • rとSの値設定が重要(流量制御)
  • autoReplenishmentはtrueにすると、replenishmentPeriodごとにトークンが補充されるが、falseにすると、bool TryReplenish()で手動でトークンを追加する必要が出てくる
  • queueLimitはWaitAsyncの時に影響する(permitCountの合計値が溢れない限り待たされる)
  • bが小さすぎると理論値のr(token/sec)よりも遅くなることがある
  • WaitAsyncしてキューから溢れるか、Acquireして失敗した場合はRateLimitLease.IsAquired = falseで返される
    • キューの長さを十分にとった場合、WaitAsyncした方が結果は理想値に近くなる
  • RateLimitLease.GetMetadata(MetadataName.Retry, out TimeSpan retry) で指定された分だけ待ってからリトライすることを想定されている
  • permitCount=0にすると、確認だけして成功失敗を返し、トークンの確保操作は行わないという特殊動作を行う
  • WaitAsyncする時、キューに入る時はTaskCompletionSourceをアロケートして、ValueTaskの中に入れるので、頻繁にキュー待ちするようなケースはWaitAsyncを使う時は要注意

ConcurrencyLimitについて

  • Semaphoreのようなもの
  • Semaphoreと異なり、所有者以外の解放はできない
  • パラメーターは以下
    • permitLimit = 一度に許可されるpermitCountの数
    • queueLimit = WaitAsyncする時に影響、超えるとRateLimitLease.IsAquired = falseで返される
  • AcquireあるいはWaitAsyncする時重みづけができる(permitCount=1にするとSemaphoreと同じ運用が可能)
  • RetryAfterはメタデータとして含まれないので、使う側が判断してウェイトを入れる必要がある
  • queueLimitを超えた量WaitAsyncを行うと、即座にRateLimitLease.IsAquired = falseで返される
  • permitCount=0にすると、確認だけして成功失敗を返し、トークンの確保操作は行わないという特殊動作を行う
  • WaitAsyncする時、キューに入る時はTaskCompletionSourceをアロケートして、ValueTaskの中に入れるので、頻繁にキュー待ちするようなケースはWaitAsyncを使う時は要注意
  • RWロックを実現するのに使える(Wの時はreq=permitLimitにして、Rの時はreq=1にする)
    • デッドロックに注意(Wを取った同じスレッドでRを取得してはいけない)
    • よりしっかりしたものを使いたいときは Microsoft.VisualStudio.Threading のAsyncReaderWriterLock等を使うことを推奨
// See https://aka.ms/new-console-template for more information
using System.Threading.RateLimiting;
using System.Diagnostics;
await Concurrency();
// await TokenBucket();
async Task Concurrency()
{
await using var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(
100, QueueProcessingOrder.OldestFirst, 100
));
var totalsw = new Stopwatch();
totalsw.Start();
await Task.WhenAll(
Enumerable.Range(0, 10).Select(async idx =>
{
await Task.Yield();
for (int i = 0; i < 100; i++)
{
var sw = new Stopwatch();
sw.Start();
while (true)
{
using (var lease = await limiter.WaitAsync(50))
{
// Console.WriteLine($"{idx}, {i}, {lease.IsAcquired}");
if (lease.IsAcquired)
{
await Task.Delay(100).ConfigureAwait(false);
break;
}
if (lease.TryGetMetadata<TimeSpan>(MetadataName.RetryAfter, out var interval))
{
Console.WriteLine($"do delay: {interval}");
await Task.Delay(interval);
}
else
{
await Task.Delay(100).ConfigureAwait(false);
}
}
}
sw.Stop();
Console.WriteLine($"{idx}, {i}, {sw.Elapsed}");
}
})
);
totalsw.Stop();
Console.WriteLine($"total {totalsw.Elapsed}");
}
async Task TokenBucket()
{
await using var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(
200, QueueProcessingOrder.OldestFirst, 100, TimeSpan.FromMilliseconds(1000), 100)
);
var totalsw = new Stopwatch();
totalsw.Start();
await Task.WhenAll(
Enumerable.Range(0, 1).Select(async idx =>
{
await Task.Yield();
for (int i = 0; i < 100; i++)
{
var sw = new Stopwatch();
sw.Start();
while (true)
{
using (var lease = await limiter.WaitAsync(40))
{
// Console.WriteLine($"{idx}, {i}, {lease.IsAcquired}");
if (lease.IsAcquired)
{
break;
}
lease.TryGetMetadata<TimeSpan>(MetadataName.RetryAfter, out var interval);
Console.WriteLine($"do delay: {interval}");
await Task.Delay(interval);
}
}
sw.Stop();
Console.WriteLine($"{idx}, {i}, {sw.Elapsed}");
}
})
);
totalsw.Stop();
Console.WriteLine($"total {totalsw.Elapsed}");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment