Created
October 23, 2023 20:05
-
-
Save StephenCleary/eae3cb18188258b700581b20acab548d to your computer and use it in GitHub Desktop.
A System.Threading.Channel<T> that allows custom bounds (more complex than counting items)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Channels; | |
using Nito.AsyncEx; | |
// All members must be safe to call while under lock. | |
public interface ICustomBounds<in T> | |
{ | |
bool IsEmpty { get; } | |
bool IsFull { get; } | |
void Add(T item); | |
void Subtract(T item); | |
} | |
public sealed class CustomBoundedChannel<T> : Channel<T> | |
{ | |
public CustomBoundedChannel(UnboundedChannelOptions options, ICustomBounds<T> bounds) | |
{ | |
_bounds = bounds; | |
_channel = Channel.CreateUnbounded<T>(options); | |
_mutex = new(); | |
_completedOrNotFull = new(_mutex); | |
_completedOrNotEmpty = new(_mutex); | |
Reader = new ChannelReader(this); | |
Writer = new ChannelWriter(this); | |
} | |
private readonly ICustomBounds<T> _bounds; | |
private readonly Channel<T> _channel; | |
private readonly AsyncLock _mutex; | |
private readonly AsyncConditionVariable _completedOrNotFull; | |
private readonly AsyncConditionVariable _completedOrNotEmpty; | |
private sealed class ChannelWriter : ChannelWriter<T> | |
{ | |
public ChannelWriter(CustomBoundedChannel<T> parent) | |
{ | |
_parent = parent; | |
} | |
public override bool TryWrite(T item) | |
{ | |
using var key = _parent._mutex.Lock(); | |
if (_parent._bounds.IsFull) | |
return false; | |
if (!_parent._channel.Writer.TryWrite(item)) | |
return false; | |
_parent._bounds.Add(item); | |
if (!_parent._bounds.IsEmpty) | |
_parent._completedOrNotEmpty.Notify(); | |
return true; | |
} | |
public override bool TryComplete(Exception? error = null) | |
{ | |
using var key = _parent._mutex.Lock(); | |
if (!_parent._channel.Writer.TryComplete(error)) | |
return false; | |
_parent._completedOrNotEmpty.NotifyAll(); | |
_parent._completedOrNotFull.NotifyAll(); | |
return true; | |
} | |
public override async ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken = default) | |
{ | |
using var key = _parent._mutex.Lock(CancellationToken.None); | |
while (true) | |
{ | |
if (_parent._channel.Reader.Completion.IsCompleted) | |
return false; | |
if (!_parent._bounds.IsFull) | |
return true; | |
await _parent._completedOrNotFull.WaitAsync(cancellationToken).ConfigureAwait(false); | |
} | |
} | |
public override async ValueTask WriteAsync(T item, CancellationToken cancellationToken = default) | |
{ | |
using var key = _parent._mutex.Lock(CancellationToken.None); | |
while (true) | |
{ | |
if (_parent._channel.Reader.Completion.IsCompleted) | |
throw new ChannelClosedException(); | |
if (!_parent._bounds.IsFull) | |
{ | |
if (!_parent._channel.Writer.TryWrite(item)) | |
continue; | |
_parent._bounds.Add(item); | |
if (!_parent._bounds.IsEmpty) | |
_parent._completedOrNotEmpty.Notify(); | |
return; | |
} | |
await _parent._completedOrNotFull.WaitAsync(cancellationToken).ConfigureAwait(false); | |
} | |
} | |
private readonly CustomBoundedChannel<T> _parent; | |
} | |
private sealed class ChannelReader : ChannelReader<T> | |
{ | |
public ChannelReader(CustomBoundedChannel<T> parent) | |
{ | |
_parent = parent; | |
} | |
public override bool TryRead(out T item) | |
{ | |
using var key = _parent._mutex.Lock(); | |
if (!_parent._channel.Reader.TryRead(out item!)) | |
return false; | |
_parent._bounds.Subtract(item); | |
if (!_parent._bounds.IsFull) | |
_parent._completedOrNotFull.Notify(); | |
return true; | |
} | |
public override async ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default) | |
{ | |
using var key = _parent._mutex.Lock(CancellationToken.None); | |
while (true) | |
{ | |
if (_parent._channel.Reader.Completion.IsCompleted) | |
return false; | |
if (!_parent._bounds.IsEmpty) | |
return true; | |
await _parent._completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(false); | |
} | |
} | |
public override async ValueTask<T> ReadAsync(CancellationToken cancellationToken = default) | |
{ | |
using var key = _parent._mutex.Lock(cancellationToken); | |
while (true) | |
{ | |
if (!_parent._bounds.IsEmpty) | |
{ | |
if (!_parent._channel.Reader.TryRead(out var result)) | |
continue; | |
_parent._bounds.Subtract(result); | |
if (!_parent._bounds.IsFull) | |
_parent._completedOrNotFull.Notify(); | |
return result; | |
} | |
if (_parent._channel.Reader.Completion.IsCompleted) | |
throw new ChannelClosedException(); | |
await _parent._completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(false); | |
} | |
} | |
public override Task Completion => _parent._channel.Reader.Completion; | |
public override bool CanCount => _parent._channel.Reader.CanCount; | |
public override int Count => _parent._channel.Reader.Count; | |
public override bool CanPeek => _parent._channel.Reader.CanPeek; | |
public override bool TryPeek(out T item) => _parent._channel.Reader.TryPeek(out item!); | |
private readonly CustomBoundedChannel<T> _parent; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example channel of strings that applies a bound based on the length of the strings rather than the count of the strings: