Created
May 16, 2022 21:58
-
-
Save noseratio/083be04528f14d4b7205e79b86e3ed1d to your computer and use it in GitHub Desktop.
An async queue which can be cleared from a producer end
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://twitter.com/noseratio/status/1526314613364490241 | |
public sealed class AsyncQueue<T>: IAsyncDisposable | |
{ | |
private readonly Queue<T> _queue = new(); | |
private readonly SemaphoreSlim _semaphore = new(initialCount: 1); | |
private readonly CancellationTokenSource _cts = new(); | |
private TaskCompletionSource<DBNull> _itemTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); | |
public async Task Clear(CancellationToken cancelToken) | |
{ | |
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancelToken); | |
await _semaphore.WaitAsync(cts.Token); | |
try | |
{ | |
_queue.Clear(); | |
} | |
finally | |
{ | |
_semaphore.Release(); | |
} | |
} | |
public void Complete() | |
{ | |
_cts.Cancel(); | |
} | |
public async Task Enqueue(T item, CancellationToken cancelToken) | |
{ | |
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancelToken); | |
await _semaphore.WaitAsync(cts.Token); | |
try | |
{ | |
_queue.Enqueue(item); | |
_itemTcs.TrySetResult(DBNull.Value); | |
} | |
finally | |
{ | |
_semaphore.Release(); | |
} | |
} | |
public async Task<T> Dequeue(CancellationToken cancelToken) | |
{ | |
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancelToken); | |
while (true) | |
{ | |
TaskCompletionSource<DBNull>? tcs; | |
await _semaphore.WaitAsync(cts.Token); | |
try | |
{ | |
cts.Token.ThrowIfCancellationRequested(); | |
if (_queue.TryDequeue(out var item)) | |
{ | |
return item; | |
} | |
if (_itemTcs.Task.IsCompleted) | |
{ | |
_itemTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); | |
} | |
tcs = _itemTcs; | |
} | |
finally | |
{ | |
_semaphore.Release(); | |
} | |
using var rego = cts.Token.Register(() => tcs.TrySetCanceled(cts.Token)); | |
await tcs.Task; | |
} | |
} | |
public async IAsyncEnumerable<T> DequeueAll( | |
[EnumeratorCancellation] CancellationToken cancelToken) | |
{ | |
while (true) | |
{ | |
yield return await this.Dequeue(cancelToken); | |
} | |
} | |
public async ValueTask DisposeAsync() | |
{ | |
_cts.Cancel(); | |
await _semaphore.WaitAsync(); | |
try | |
{ | |
_queue.Clear(); | |
} | |
finally | |
{ | |
_semaphore.Release(); | |
_cts.Dispose(); | |
_semaphore.Dispose(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment