Skip to content

Instantly share code, notes, and snippets.

@neon-sunset
Created March 25, 2026 13:17
Show Gist options
  • Select an option

  • Save neon-sunset/ce5bf77c8b9e315f3f87c611e9e31a0b to your computer and use it in GitHub Desktop.

Select an option

Save neon-sunset/ce5bf77c8b9e315f3f87c611e9e31a0b to your computer and use it in GitHub Desktop.
Mailbox processor in C#
using System.ComponentModel;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
/// <summary>
/// Specifies relaxations for channel operation that can improve performance when adhered to.
/// </summary>
[Flags]
public enum ChRelaxations
{
/// <summary>
/// No relaxations are applied. Both multiple readers and multiple writers are supported.
/// </summary>
None = 0,
/// <summary>
/// Only a single reader will be reading from the channel at any time.
/// </summary>
SingleReader = 1 << 0,
/// <summary>
/// Only a single writer will be writing to the channel at any time.
/// </summary>
SingleWriter = 1 << 1,
/// <summary>
/// Combines both SingleReader and SingleWriter relaxations.
/// </summary>
SingleReaderWriter = SingleReader | SingleWriter,
}
/// <summary>
/// Specifies the behavior of a bounded channel when it becomes full.
/// </summary>
public enum ChFullMode
{
/// <summary>
/// When the channel is full, the oldest item is removed to make room for the new item.
/// </summary>
DropOldest,
/// <summary>
/// When the channel is full, the new item replaces the last item written to the channel.
/// </summary>
DropNewest,
/// <summary>
/// When the channel is full, the write operation is rejected.
/// </summary>
DropWrite
}
/// <summary>
/// Provides factory methods for creating and working with strongly-typed communication channels for inter-task messaging.
/// </summary>
public static class Ch
{
/// <summary>
/// Creates a bounded channel with the specified capacity that will asynchronously block when full.
/// </summary>
/// <typeparam name="T">The type of elements in the channel.</typeparam>
/// <param name="capacity">The maximum number of items the channel can hold.</param>
/// <param name="relaxations">Optional relaxations that can improve performance if adhered to.</param>
/// <returns>A new bounded channel where writes can be asynchronous.</returns>
/// <remarks>
/// This operation produces approximately 800B of allocations.
/// Please consider using channels in a long-lived way to avoid excessive allocation traffic.
/// </remarks>
public static AsyncCh<T> Bounded<T>(int capacity, ChRelaxations relaxations = ChRelaxations.None)
{
return new(capacity, relaxations);
}
/// <summary>
/// Creates a bounded channel with the specified capacity and behavior when the channel is full.
/// </summary>
/// <typeparam name="T">The type of elements in the channel.</typeparam>
/// <param name="capacity">The maximum number of items the channel can hold.</param>
/// <param name="mode">The behavior when the channel is full.</param>
/// <param name="dropped">Optional callback invoked when an item is dropped.</param>
/// <param name="relaxations">Optional relaxations that can improve performance if adhered to.</param>
/// <returns>A new bounded channel with the specified full mode behavior.</returns>
/// <remarks>
/// This operation produces approximately 800B of allocations.
/// Please consider using channels in a long-lived way to avoid excessive allocation traffic.
/// </remarks>
public static Ch<T> Bounded<T>(
int capacity,
ChFullMode mode,
Action<T>? dropped = null,
ChRelaxations relaxations = ChRelaxations.None)
{
return new(capacity, mode, dropped, relaxations);
}
/// <summary>
/// Creates an unbounded channel that can grow without limits.
/// </summary>
/// <typeparam name="T">The type of elements in the channel.</typeparam>
/// <param name="relaxations">Optional relaxations that can improve performance if adhered to.</param>
/// <returns>A new unbounded channel.</returns>
/// <remarks>
/// This operation produces approximately 1.5KB of allocations.
/// Please consider using channels in a long-lived way to avoid excessive allocation traffic.
/// </remarks>
public static Ch<T> Unbounded<T>(ChRelaxations relaxations = ChRelaxations.None)
{
return new(relaxations);
}
/// <summary>
/// Starts listening for items on a channel with a synchronous callback.
/// </summary>
/// <typeparam name="T">The type of elements in the channel.</typeparam>
/// <param name="ch">The channel to listen to.</param>
/// <param name="callback">The callback to invoke for each received item.</param>
/// <param name="ct">Optional cancellation token to stop listening.</param>
/// <exception cref="ArgumentNullException">Thrown if callback is null.</exception>
public static void Listen<T>(
this Ch<T> ch,
Action<T> callback,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(callback);
_ = ChStatics<T>.Listener(ch.Value.Reader, callback, ct);
}
/// <summary>
/// Starts listening for items on a channel with an asynchronous callback.
/// </summary>
/// <typeparam name="T">The type of elements in the channel.</typeparam>
/// <param name="ch">The channel to listen to.</param>
/// <param name="callback">The asynchronous callback to invoke for each received item.</param>
/// <param name="ct">Optional cancellation token to stop listening.</param>
/// <exception cref="ArgumentNullException">Thrown if callback is null.</exception>
public static void Listen<T>(
this Ch<T> ch,
Func<T, CancellationToken, ValueTask> callback,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(callback);
_ = ChStatics<T>.Listener(ch.Value.Reader, callback, ct);
}
/// <summary>
/// Starts listening for items on an async channel with a synchronous callback.
/// </summary>
/// <typeparam name="T">The type of elements in the channel.</typeparam>
/// <param name="ch">The async channel to listen to.</param>
/// <param name="callback">The callback to invoke for each received item.</param>
/// <param name="ct">Optional cancellation token to stop listening.</param>
/// <exception cref="ArgumentNullException">Thrown if callback is null.</exception>
public static void Listen<T>(
this AsyncCh<T> ch,
Action<T> callback,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(callback);
_ = ChStatics<T>.Listener(ch.Value.Reader, callback, ct);
}
/// <summary>
/// Starts listening for items on an async channel with an asynchronous callback.
/// </summary>
/// <typeparam name="T">The type of elements in the channel.</typeparam>
/// <param name="ch">The async channel to listen to.</param>
/// <param name="callback">The asynchronous callback to invoke for each received item.</param>
/// <param name="ct">Optional cancellation token to stop listening.</param>
/// <exception cref="ArgumentNullException">Thrown if callback is null.</exception>
public static void Listen<T>(
this AsyncCh<T> ch,
Func<T, CancellationToken, ValueTask> callback,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(callback);
_ = ChStatics<T>.Listener(ch.Value.Reader, callback, ct);
}
}
static class ChStatics<T>
{
// TODO: Implement RentedCh<T> once the need arises.
// // Each channel is going to take about 800-2KB of memory (assuming it has not grown too large).
// // We could run into a scenario where we have a particularly large unbounded channel which
// // roots too much memory, but we are not memory-constrained on most of our systems so it is fine.
// static readonly int PoolSize = Environment.ProcessorCount * 4;
// sealed class ChannelPool(UnboundedChannelOptions opts)
// : DefaultObjectPool<Channel<T>>(new PooledPolicy(opts), PoolSize);
// sealed class PooledPolicy(UnboundedChannelOptions opts) : IPooledObjectPolicy<Channel<T>>
// {
// public Channel<T> Create() => Channel.CreateUnbounded<T>(opts);
// public bool Return(Channel<T> obj)
// {
// if (obj.Reader.Count > 0)
// {
// // We don't want to return a channel that has items in it.
// obj.Writer.TryComplete();
// return false;
// }
// return true;
// }
// }
public static async Task Listener(
ChannelReader<T> reader,
Action<T> callback,
CancellationToken ct)
{
await Task.Yield();
while (await reader.WaitToReadAsync(ct).ConfigureAwait(false))
{
while (reader.TryRead(out var item))
{
callback(item);
}
}
}
public static async Task Listener(
ChannelReader<T> reader,
Func<T, CancellationToken, ValueTask> callback,
CancellationToken ct)
{
await Task.Yield();
while (await reader.WaitToReadAsync(ct).ConfigureAwait(false))
{
while (reader.TryRead(out var item))
{
await callback(item, ct).ConfigureAwait(false);
}
}
}
}
/// <summary>
/// Represents a channel that supports synchronous writing and asynchronous reading.
/// </summary>
/// <typeparam name="T">The type of elements in the channel.</typeparam>
public readonly struct Ch<T> : IAsyncEnumerable<T>, IDisposable
{
/// <summary>
/// The underlying channel instance.
/// </summary>
public Channel<T> Value { get; }
internal Ch(ChRelaxations relaxations)
{
var opts = new UnboundedChannelOptions
{
SingleReader = relaxations.HasFlag(ChRelaxations.SingleReader),
SingleWriter = relaxations.HasFlag(ChRelaxations.SingleWriter)
};
Value = Channel.CreateUnbounded<T>(opts);
}
internal Ch(int capacity, ChFullMode mode, Action<T>? dropped, ChRelaxations relaxations)
{
var opts = new BoundedChannelOptions(capacity)
{
FullMode = mode switch
{
ChFullMode.DropOldest => BoundedChannelFullMode.DropOldest,
ChFullMode.DropNewest => BoundedChannelFullMode.DropNewest,
ChFullMode.DropWrite => BoundedChannelFullMode.DropWrite,
_ => throw new ArgumentOutOfRangeException(nameof(mode), mode, null)
},
SingleReader = relaxations.HasFlag(ChRelaxations.SingleReader),
SingleWriter = relaxations.HasFlag(ChRelaxations.SingleWriter)
};
Value = Channel.CreateBounded(opts, dropped);
}
/// <summary>
/// Writes an item to the channel.
/// </summary>
/// <param name="item">The item to write to the channel.</param>
/// <exception cref="InvalidOperationException">Thrown if the channel is closed.</exception>
public void Write(T item)
{
if (!Value.Writer.TryWrite(item))
InvalidOperation("Cannot write to a closed channel.");
}
/// <summary>
/// Writes a range of items to the channel.
/// </summary>
/// <param name="items">The items to write to the channel.</param>
/// <exception cref="InvalidOperationException">Thrown if the channel is closed.</exception>
public void WriteRange(ReadOnlySpan<T> items)
{
foreach (var item in items)
{
if (!Value.Writer.TryWrite(item))
InvalidOperation("Cannot write to a closed channel.");
}
}
/// <summary>
/// Asynchronously reads an item from the channel.
/// </summary>
/// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the read operation.</param>
/// <returns>A task that represents the asynchronous read operation and wraps the item read from the channel.</returns>
public ValueTask<T> Read(CancellationToken ct = default) => Value.Reader.ReadAsync(ct);
/// <summary>
/// Gets an enumerator that asynchronously iterates through the channel.
/// </summary>
/// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the enumeration.</param>
/// <returns>An enumerator of <typeparamref name="T"/> for consuming the channel.</returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default)
{
return Value.Reader.ReadAllAsync(ct).GetAsyncEnumerator(ct);
}
/// <summary>
/// Provides a convenient syntax for writing to a channel.
/// </summary>
/// <param name="item">The item to write to the channel.</param>
public void operator <<=(T item) => Write(item);
/// <summary>
/// Provides a convenient syntax for writing a range of items to a channel.
/// </summary>
/// <param name="items">The items to write to the channel.</param>
public void operator <<=(ReadOnlySpan<T> items) => WriteRange(items);
/// <summary>
/// Marks the channel as complete, meaning no more items will be written to it.
/// </summary>
public void Dispose()
{
Value.Writer.TryComplete();
}
[DoesNotReturn, StackTraceHidden]
static void InvalidOperation(string message) => throw new InvalidOperationException(message);
}
/// <summary>
/// Represents a bounded channel that supports asynchronous reading and writing.
/// </summary>
/// <typeparam name="T">The type of elements in the channel.</typeparam>
public readonly struct AsyncCh<T> : IAsyncEnumerable<T>, IDisposable
{
/// <summary>
/// The underlying channel instance.
/// </summary>
public Channel<T> Value { get; }
internal AsyncCh(int capacity, ChRelaxations relaxations = ChRelaxations.None)
{
var opts = new BoundedChannelOptions(capacity)
{
SingleReader = relaxations.HasFlag(ChRelaxations.SingleReader),
SingleWriter = relaxations.HasFlag(ChRelaxations.SingleWriter)
};
Value = Channel.CreateBounded<T>(opts);
}
/// <summary>
/// Writes an item to the channel, asynchronously blocking if the channel is full.
/// </summary>
/// <param name="item">The item to write to the channel.</param>
/// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the write operation.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public ValueTask Write(T item, CancellationToken ct = default) => Value.Writer.WriteAsync(item, ct);
/// <summary>
/// Writes a range of items to the channel, asynchronously blocking if the channel is full.
/// </summary>
/// <param name="items">The items to write to the channel.</param>
/// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the write operation.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
public async ValueTask WriteRange(ReadOnlyMemory<T> items, CancellationToken ct = default)
{
while (items.Length > 0 &&
await Value.Writer.WaitToWriteAsync(ct).ConfigureAwait(false))
{
var written = 0;
foreach (var item in items.Span)
{
if (!Value.Writer.TryWrite(item))
break;
written++;
}
items = items[written..];
}
}
/// <summary>
/// Asynchronously reads an item from the channel.
/// </summary>
/// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the read operation.</param>
/// <returns>A task that represents the asynchronous read operation and wraps the item read from the channel.</returns>
public ValueTask<T> Read(CancellationToken ct = default) => Value.Reader.ReadAsync(ct);
/// <summary>
/// Gets an enumerator that asynchronously iterates through the channel.
/// </summary>
/// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the enumeration.</param>
/// <returns>An enumerator of <typeparamref name="T"/> for consuming the channel.</returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default)
{
return Value.Reader.ReadAllAsync(ct).GetAsyncEnumerator(ct);
}
/// <summary>
/// Provides a convenient syntax for writing to a channel.
/// </summary>
/// <param name="ch">The channel to write to.</param>
/// <param name="item">The item to write to the channel.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public static ValueTask operator <<(AsyncCh<T> ch, T item) => ch.Write(item);
/// <summary>
/// Provides a convenient syntax for writing a range of items to a channel.
/// </summary>
/// <param name="ch">The channel to write to.</param>
/// <param name="items">The items to write to the channel.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public static ValueTask operator <<(AsyncCh<T> ch, ReadOnlyMemory<T> items) => ch.WriteRange(items);
/// <summary>
/// Marks the channel as complete, meaning no more items will be written to it.
/// </summary>
public void Dispose() => Value.Writer.TryComplete();
}
/// <summary>
/// Provides factory methods for creating mailboxes with integrated listening capability.
/// </summary>
public static class Mailbox
{
/// <summary>
/// Creates a bounded mailbox with the specified capacity that will asynchronously block when full.
/// </summary>
/// <typeparam name="T">The type of elements in the mailbox.</typeparam>
/// <param name="capacity">The maximum number of items the mailbox can hold.</param>
/// <param name="callback">The callback to invoke for each received item.</param>
/// <param name="readers">The number of readers that will be processing items.</param>
/// <returns>A new bounded mailbox with integrated listener.</returns>
/// <remarks>
/// This operation produces approximately 800B of allocations plus listener task overhead.
/// The mailbox will automatically start listening when created.
/// </remarks>
public static AsyncMailbox<T> Bounded<T>(int capacity, Action<T> callback, int readers = 1)
{
return new(capacity, callback, readers);
}
/// <summary>
/// Creates a bounded mailbox with the specified capacity that will asynchronously block when full.
/// </summary>
/// <typeparam name="T">The type of elements in the mailbox.</typeparam>
/// <param name="capacity">The maximum number of items the mailbox can hold.</param>
/// <param name="callback">The asynchronous callback to invoke for each received item.</param>
/// <param name="readers">The number of readers that will be processing items.</param>
/// <returns>A new bounded mailbox with integrated listener.</returns>
/// <remarks>
/// The mailbox will automatically start listening when created.
/// </remarks>
public static AsyncMailbox<T> Bounded<T>(
int capacity,
Func<T, CancellationToken, ValueTask> callback,
int readers = 1)
{
return new(capacity, callback, readers);
}
/// <summary>
/// Creates a bounded mailbox with the specified capacity and behavior when the mailbox is full.
/// </summary>
/// <typeparam name="T">The type of elements in the mailbox.</typeparam>
/// <param name="capacity">The maximum number of items the mailbox can hold.</param>
/// <param name="mode">The behavior when the mailbox is full.</param>
/// <param name="callback">The callback to invoke for each received item.</param>
/// <param name="readers">The number of readers that will be processing items.</param>
/// <param name="dropped">Optional callback invoked when an item is dropped.</param>
/// <returns>A new bounded mailbox with the specified full mode behavior.</returns>
/// <remarks>
/// This operation produces approximately 800B of allocations plus listener task overhead.
/// The mailbox will automatically start listening when created.
/// </remarks>
public static Mailbox<T> Bounded<T>(
int capacity,
ChFullMode mode,
Action<T> callback,
int readers = 1,
Action<T>? dropped = null)
{
return new(capacity, mode, callback, readers, dropped);
}
/// <summary>
/// Creates a bounded mailbox with the specified capacity and behavior when the mailbox is full.
/// </summary>
/// <typeparam name="T">The type of elements in the mailbox.</typeparam>
/// <param name="capacity">The maximum number of items the mailbox can hold.</param>
/// <param name="mode">The behavior when the mailbox is full.</param>
/// <param name="callback">The asynchronous callback to invoke for each received item.</param>
/// <param name="readers">The number of readers that will be processing items.</param>
/// <param name="dropped">Optional callback invoked when an item is dropped.</param>
/// <returns>A new bounded mailbox with the specified full mode behavior.</returns>
/// <remarks>
/// The mailbox will automatically start listening when created.
/// </remarks>
public static Mailbox<T> Bounded<T>(
int capacity,
ChFullMode mode,
Func<T, CancellationToken, ValueTask> callback,
int readers = 1,
Action<T>? dropped = null)
{
return new(capacity, mode, callback, readers, dropped);
}
/// <summary>
/// Creates an unbounded mailbox that can grow without limits.
/// </summary>
/// <typeparam name="T">The type of elements in the mailbox.</typeparam>
/// <param name="callback">The callback to invoke for each received item.</param>
/// <param name="readers">The number of readers that will be processing items.</param>
/// <returns>A new unbounded mailbox.</returns>
/// <remarks>
/// The mailbox will automatically start listening when created.
/// </remarks>
public static Mailbox<T> Unbounded<T>(Action<T> callback, int readers = 1)
{
return new(callback, readers);
}
/// <summary>
/// Creates an unbounded mailbox that can grow without limits.
/// </summary>
/// <typeparam name="T">The type of elements in the mailbox.</typeparam>
/// <param name="callback">The asynchronous callback to invoke for each received item.</param>
/// <param name="readers">The number of readers that will be processing items.</param>
/// <returns>A new unbounded mailbox.</returns>
/// <remarks>
/// The mailbox will automatically start listening when created.
/// </remarks>
public static Mailbox<T> Unbounded<T>(Func<T, CancellationToken, ValueTask> callback, int readers = 1)
{
return new(callback, readers);
}
}
/// <summary>
/// Represents a mailbox that supports synchronous writing with integrated listening capability.
/// </summary>
/// <typeparam name="T">The type of elements in the mailbox.</typeparam>
public sealed class Mailbox<T> : IAsyncDisposable
{
readonly Ch<T> channel;
readonly CancellationTokenSource cancellation;
readonly Task[] readers;
volatile bool disposed;
/// <summary>
/// Gets the number of items waiting to be processed in the mailbox.
/// </summary>
public int UnreadCount => channel.Value.Reader.Count;
internal Mailbox(Action<T> callback, int readerCount)
{
ArgumentNullException.ThrowIfNull(callback);
ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1);
var relaxations = readerCount == 1
? ChRelaxations.SingleReader
: ChRelaxations.None;
channel = Ch.Unbounded<T>(relaxations);
cancellation = new CancellationTokenSource();
readers = StartReaders(callback, readerCount, cancellation.Token);
}
internal Mailbox(Func<T, CancellationToken, ValueTask> callback, int readerCount)
{
ArgumentNullException.ThrowIfNull(callback);
ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1);
var relaxations = readerCount == 1
? ChRelaxations.SingleReader
: ChRelaxations.None;
channel = Ch.Unbounded<T>(relaxations);
cancellation = new CancellationTokenSource();
readers = StartReaders(callback, readerCount, cancellation.Token);
}
internal Mailbox(
int capacity,
ChFullMode mode,
Action<T> callback,
int readerCount,
Action<T>? dropped)
{
ArgumentNullException.ThrowIfNull(callback);
ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1);
var relaxations = readerCount == 1
? ChRelaxations.SingleReader
: ChRelaxations.None;
channel = Ch.Bounded(capacity, mode, dropped, relaxations);
cancellation = new CancellationTokenSource();
readers = StartReaders(callback, readerCount, cancellation.Token);
}
internal Mailbox(
int capacity,
ChFullMode mode,
Func<T, CancellationToken, ValueTask> callback,
int readerCount,
Action<T>? dropped)
{
ArgumentNullException.ThrowIfNull(callback);
ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1);
var relaxations = readerCount == 1
? ChRelaxations.SingleReader
: ChRelaxations.None;
channel = Ch.Bounded(capacity, mode, dropped, relaxations);
cancellation = new CancellationTokenSource();
readers = StartReaders(callback, readerCount, cancellation.Token);
}
Task[] StartReaders(Action<T> callback, int readerCount, CancellationToken ct)
{
var tasks = new Task[readerCount];
for (var i = 0; i < readerCount; i++)
{
tasks[i] = ChStatics<T>.Listener(channel.Value.Reader, callback, ct);
}
return tasks;
}
Task[] StartReaders(Func<T, CancellationToken, ValueTask> callback, int readerCount, CancellationToken ct)
{
var tasks = new Task[readerCount];
for (var i = 0; i < readerCount; i++)
{
tasks[i] = ChStatics<T>.Listener(channel.Value.Reader, callback, ct);
}
return tasks;
}
/// <summary>
/// Writes an item to the mailbox.
/// </summary>
/// <param name="item">The item to write to the mailbox.</param>
/// <exception cref="InvalidOperationException">Thrown if the mailbox is closed.</exception>
public void Post(T item)
{
ObjectDisposedException.ThrowIf(disposed, this);
channel.Write(item);
}
/// <summary>
/// Writes a range of items to the mailbox.
/// </summary>
/// <param name="items">The items to write to the mailbox.</param>
/// <exception cref="InvalidOperationException">Thrown if the mailbox is closed.</exception>
public void PostRange(ReadOnlySpan<T> items)
{
ObjectDisposedException.ThrowIf(disposed, this);
channel.WriteRange(items);
}
/// <summary>
/// Provides a convenient syntax for posting to a mailbox.
/// </summary>
/// <param name="item">The item to post to the mailbox.</param>
public void operator <<=(T item) => Post(item);
/// <summary>
/// Provides a convenient syntax for posting a range of items to a mailbox.
/// </summary>
/// <param name="items">The items to post to the mailbox.</param>
public void operator <<=(ReadOnlySpan<T> items) => PostRange(items);
/// <summary>
/// Stops the readers, closes the channel and disposes all resources asynchronously.
/// </summary>
public async ValueTask DisposeAsync()
{
if (disposed)
return;
disposed = true;
// Signal cancellation to stop readers
cancellation.Cancel();
// Wait for readers to complete and dispose resources
try
{
await Task
.WhenAll(readers)
.WaitAsync(TimeSpan.FromSeconds(100))
.ConfigureAwait(false);
}
catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is OperationCanceledException))
{
// Readers were cancelled, this is expected
}
catch (OperationCanceledException)
{
// Readers were cancelled, this is expected
}
finally
{
// Close the channel to prevent new items
channel.Dispose();
cancellation.Dispose();
}
}
}
/// <summary>
/// Represents a bounded mailbox that supports asynchronous reading and writing with integrated listening capability.
/// </summary>
/// <typeparam name="T">The type of elements in the mailbox.</typeparam>
public sealed class AsyncMailbox<T> : IAsyncDisposable
{
readonly AsyncCh<T> channel;
readonly CancellationTokenSource cancellation;
readonly Task[] readers;
volatile bool disposed;
/// <summary>
/// Gets the number of items waiting to be processed in the mailbox.
/// </summary>
public int UnreadCount => channel.Value.Reader.Count;
internal AsyncMailbox(int capacity, Action<T> callback, int readerCount)
{
ArgumentNullException.ThrowIfNull(callback);
ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1);
var relaxations = readerCount == 1
? ChRelaxations.SingleReader
: ChRelaxations.None;
channel = Ch.Bounded<T>(capacity, relaxations);
cancellation = new CancellationTokenSource();
readers = StartReaders(callback, readerCount, cancellation.Token);
}
internal AsyncMailbox(int capacity, Func<T, CancellationToken, ValueTask> callback, int readerCount)
{
ArgumentNullException.ThrowIfNull(callback);
ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1);
var relaxations = readerCount == 1
? ChRelaxations.SingleReader
: ChRelaxations.None;
channel = Ch.Bounded<T>(capacity, relaxations);
cancellation = new CancellationTokenSource();
readers = StartReaders(callback, readerCount, cancellation.Token);
}
Task[] StartReaders(Action<T> callback, int readerCount, CancellationToken ct)
{
var tasks = new Task[readerCount];
for (var i = 0; i < readerCount; i++)
{
tasks[i] = ChStatics<T>.Listener(channel.Value.Reader, callback, ct);
}
return tasks;
}
Task[] StartReaders(Func<T, CancellationToken, ValueTask> callback, int readerCount, CancellationToken ct)
{
var tasks = new Task[readerCount];
for (var i = 0; i < readerCount; i++)
{
tasks[i] = ChStatics<T>.Listener(channel.Value.Reader, callback, ct);
}
return tasks;
}
/// <summary>
/// Writes an item to the mailbox, asynchronously blocking if the mailbox is full.
/// </summary>
/// <param name="item">The item to write to the mailbox.</param>
/// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the write operation.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public ValueTask Post(T item, CancellationToken ct = default)
{
ObjectDisposedException.ThrowIf(disposed, this);
return channel.Write(item, ct);
}
/// <summary>
/// Writes a range of items to the mailbox, asynchronously blocking if the mailbox is full.
/// </summary>
/// <param name="items">The items to write to the mailbox.</param>
/// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the write operation.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public ValueTask PostRange(ReadOnlyMemory<T> items, CancellationToken ct = default)
{
ObjectDisposedException.ThrowIf(disposed, this);
return channel.WriteRange(items, ct);
}
/// <summary>
/// Provides a convenient syntax for posting to a mailbox.
/// </summary>
/// <param name="mailbox">The mailbox to post to.</param>
/// <param name="item">The item to post to the mailbox.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public static ValueTask operator <<(AsyncMailbox<T> mailbox, T item) => mailbox.Post(item);
/// <summary>
/// Provides a convenient syntax for posting a range of items to a mailbox.
/// </summary>
/// <param name="mailbox">The mailbox to post to.</param>
/// <param name="items">The items to post to the mailbox.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public static ValueTask operator <<(AsyncMailbox<T> mailbox, ReadOnlyMemory<T> items) => mailbox.PostRange(items);
/// <summary>
/// Stops the readers, closes the channel and disposes all resources asynchronously.
/// </summary>
public async ValueTask DisposeAsync()
{
if (disposed)
return;
disposed = true;
// Signal cancellation to stop readers
cancellation.Cancel();
// Wait for readers to complete and dispose resources
try
{
await Task
.WhenAll(readers)
.WaitAsync(TimeSpan.FromSeconds(100))
.ConfigureAwait(false);
}
catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is OperationCanceledException))
{
// Readers were cancelled, this is expected
}
catch (OperationCanceledException)
{
// Readers were cancelled, this is expected
}
finally
{
// Close the channel to prevent new items
channel.Dispose();
cancellation.Dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment