Last active
November 9, 2023 19:18
-
-
Save noseratio/0989d5f986190b62299b8bf6a847a61e to your computer and use it in GitHub Desktop.
Simple async wrapper around .NET Queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public sealed class AsyncQueue<T>: IAsyncDisposable | |
{ | |
private readonly Queue<T> _queue = new(); | |
private readonly SemaphoreSlim _semaphore = new(initialCount: 1, maxCount: 1); | |
private readonly CancellationTokenSource _cts = new(); | |
private TaskCompletionSource _itemTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); | |
public async ValueTask<(bool, T)> TryPeekAsync(CancellationToken cancelToken) | |
{ | |
await _semaphore.WaitAsync(cancelToken); | |
try | |
{ | |
var hasValue = _queue.TryPeek(out var item); | |
return (hasValue, item!); | |
} | |
finally | |
{ | |
_semaphore.Release(); | |
} | |
} | |
public async ValueTask ClearAsync(CancellationToken cancelToken) | |
{ | |
await _semaphore.WaitAsync(cancelToken); | |
try | |
{ | |
_queue.Clear(); | |
} | |
finally | |
{ | |
_semaphore.Release(); | |
} | |
} | |
public void Complete() | |
{ | |
_cts.Cancel(); | |
} | |
public async ValueTask EnqueueAsync(T item, CancellationToken cancelToken) | |
{ | |
await _semaphore.WaitAsync(cancelToken); | |
try | |
{ | |
_queue.Enqueue(item); | |
_itemTcs.TrySetResult(); | |
} | |
finally | |
{ | |
_semaphore.Release(); | |
} | |
} | |
public async ValueTask<T> DequeueAsync(CancellationToken cancelToken) | |
{ | |
while (true) | |
{ | |
TaskCompletionSource tcs; | |
await _semaphore.WaitAsync(cancelToken); | |
try | |
{ | |
if (_queue.TryDequeue(out var item)) | |
{ | |
return item; | |
} | |
if (_itemTcs.Task.IsCompleted) | |
{ | |
_itemTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); | |
} | |
tcs = _itemTcs; | |
} | |
finally | |
{ | |
_semaphore.Release(); | |
} | |
await tcs.Task.WaitAsync(cancelToken); | |
} | |
} | |
public async IAsyncEnumerable<T> DequeueAll( | |
[EnumeratorCancellation] CancellationToken cancelToken) | |
{ | |
while (true) | |
{ | |
yield return await this.DequeueAsync(cancelToken); | |
} | |
} | |
public async ValueTask DisposeAsync() | |
{ | |
_cts.Cancel(); | |
await _semaphore.WaitAsync(); | |
try | |
{ | |
_queue.Clear(); | |
} | |
finally | |
{ | |
_cts.Dispose(); | |
_semaphore.Release(); | |
_semaphore.Dispose(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment