Created
March 25, 2026 13:17
-
-
Save neon-sunset/ce5bf77c8b9e315f3f87c611e9e31a0b to your computer and use it in GitHub Desktop.
Mailbox processor in C#
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System.ComponentModel; | |
| using System.Diagnostics; | |
| using System.Diagnostics.CodeAnalysis; | |
| using System.Runtime.CompilerServices; | |
| using System.Threading.Channels; | |
| /// <summary> | |
| /// Specifies relaxations for channel operation that can improve performance when adhered to. | |
| /// </summary> | |
| [Flags] | |
| public enum ChRelaxations | |
| { | |
| /// <summary> | |
| /// No relaxations are applied. Both multiple readers and multiple writers are supported. | |
| /// </summary> | |
| None = 0, | |
| /// <summary> | |
| /// Only a single reader will be reading from the channel at any time. | |
| /// </summary> | |
| SingleReader = 1 << 0, | |
| /// <summary> | |
| /// Only a single writer will be writing to the channel at any time. | |
| /// </summary> | |
| SingleWriter = 1 << 1, | |
| /// <summary> | |
| /// Combines both SingleReader and SingleWriter relaxations. | |
| /// </summary> | |
| SingleReaderWriter = SingleReader | SingleWriter, | |
| } | |
| /// <summary> | |
| /// Specifies the behavior of a bounded channel when it becomes full. | |
| /// </summary> | |
| public enum ChFullMode | |
| { | |
| /// <summary> | |
| /// When the channel is full, the oldest item is removed to make room for the new item. | |
| /// </summary> | |
| DropOldest, | |
| /// <summary> | |
| /// When the channel is full, the new item replaces the last item written to the channel. | |
| /// </summary> | |
| DropNewest, | |
| /// <summary> | |
| /// When the channel is full, the write operation is rejected. | |
| /// </summary> | |
| DropWrite | |
| } | |
| /// <summary> | |
| /// Provides factory methods for creating and working with strongly-typed communication channels for inter-task messaging. | |
| /// </summary> | |
| public static class Ch | |
| { | |
| /// <summary> | |
| /// Creates a bounded channel with the specified capacity that will asynchronously block when full. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the channel.</typeparam> | |
| /// <param name="capacity">The maximum number of items the channel can hold.</param> | |
| /// <param name="relaxations">Optional relaxations that can improve performance if adhered to.</param> | |
| /// <returns>A new bounded channel where writes can be asynchronous.</returns> | |
| /// <remarks> | |
| /// This operation produces approximately 800B of allocations. | |
| /// Please consider using channels in a long-lived way to avoid excessive allocation traffic. | |
| /// </remarks> | |
| public static AsyncCh<T> Bounded<T>(int capacity, ChRelaxations relaxations = ChRelaxations.None) | |
| { | |
| return new(capacity, relaxations); | |
| } | |
| /// <summary> | |
| /// Creates a bounded channel with the specified capacity and behavior when the channel is full. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the channel.</typeparam> | |
| /// <param name="capacity">The maximum number of items the channel can hold.</param> | |
| /// <param name="mode">The behavior when the channel is full.</param> | |
| /// <param name="dropped">Optional callback invoked when an item is dropped.</param> | |
| /// <param name="relaxations">Optional relaxations that can improve performance if adhered to.</param> | |
| /// <returns>A new bounded channel with the specified full mode behavior.</returns> | |
| /// <remarks> | |
| /// This operation produces approximately 800B of allocations. | |
| /// Please consider using channels in a long-lived way to avoid excessive allocation traffic. | |
| /// </remarks> | |
| public static Ch<T> Bounded<T>( | |
| int capacity, | |
| ChFullMode mode, | |
| Action<T>? dropped = null, | |
| ChRelaxations relaxations = ChRelaxations.None) | |
| { | |
| return new(capacity, mode, dropped, relaxations); | |
| } | |
| /// <summary> | |
| /// Creates an unbounded channel that can grow without limits. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the channel.</typeparam> | |
| /// <param name="relaxations">Optional relaxations that can improve performance if adhered to.</param> | |
| /// <returns>A new unbounded channel.</returns> | |
| /// <remarks> | |
| /// This operation produces approximately 1.5KB of allocations. | |
| /// Please consider using channels in a long-lived way to avoid excessive allocation traffic. | |
| /// </remarks> | |
| public static Ch<T> Unbounded<T>(ChRelaxations relaxations = ChRelaxations.None) | |
| { | |
| return new(relaxations); | |
| } | |
| /// <summary> | |
| /// Starts listening for items on a channel with a synchronous callback. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the channel.</typeparam> | |
| /// <param name="ch">The channel to listen to.</param> | |
| /// <param name="callback">The callback to invoke for each received item.</param> | |
| /// <param name="ct">Optional cancellation token to stop listening.</param> | |
| /// <exception cref="ArgumentNullException">Thrown if callback is null.</exception> | |
| public static void Listen<T>( | |
| this Ch<T> ch, | |
| Action<T> callback, | |
| CancellationToken ct = default) | |
| { | |
| ArgumentNullException.ThrowIfNull(callback); | |
| _ = ChStatics<T>.Listener(ch.Value.Reader, callback, ct); | |
| } | |
| /// <summary> | |
| /// Starts listening for items on a channel with an asynchronous callback. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the channel.</typeparam> | |
| /// <param name="ch">The channel to listen to.</param> | |
| /// <param name="callback">The asynchronous callback to invoke for each received item.</param> | |
| /// <param name="ct">Optional cancellation token to stop listening.</param> | |
| /// <exception cref="ArgumentNullException">Thrown if callback is null.</exception> | |
| public static void Listen<T>( | |
| this Ch<T> ch, | |
| Func<T, CancellationToken, ValueTask> callback, | |
| CancellationToken ct = default) | |
| { | |
| ArgumentNullException.ThrowIfNull(callback); | |
| _ = ChStatics<T>.Listener(ch.Value.Reader, callback, ct); | |
| } | |
| /// <summary> | |
| /// Starts listening for items on an async channel with a synchronous callback. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the channel.</typeparam> | |
| /// <param name="ch">The async channel to listen to.</param> | |
| /// <param name="callback">The callback to invoke for each received item.</param> | |
| /// <param name="ct">Optional cancellation token to stop listening.</param> | |
| /// <exception cref="ArgumentNullException">Thrown if callback is null.</exception> | |
| public static void Listen<T>( | |
| this AsyncCh<T> ch, | |
| Action<T> callback, | |
| CancellationToken ct = default) | |
| { | |
| ArgumentNullException.ThrowIfNull(callback); | |
| _ = ChStatics<T>.Listener(ch.Value.Reader, callback, ct); | |
| } | |
| /// <summary> | |
| /// Starts listening for items on an async channel with an asynchronous callback. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the channel.</typeparam> | |
| /// <param name="ch">The async channel to listen to.</param> | |
| /// <param name="callback">The asynchronous callback to invoke for each received item.</param> | |
| /// <param name="ct">Optional cancellation token to stop listening.</param> | |
| /// <exception cref="ArgumentNullException">Thrown if callback is null.</exception> | |
| public static void Listen<T>( | |
| this AsyncCh<T> ch, | |
| Func<T, CancellationToken, ValueTask> callback, | |
| CancellationToken ct = default) | |
| { | |
| ArgumentNullException.ThrowIfNull(callback); | |
| _ = ChStatics<T>.Listener(ch.Value.Reader, callback, ct); | |
| } | |
| } | |
| static class ChStatics<T> | |
| { | |
| // TODO: Implement RentedCh<T> once the need arises. | |
| // // Each channel is going to take about 800-2KB of memory (assuming it has not grown too large). | |
| // // We could run into a scenario where we have a particularly large unbounded channel which | |
| // // roots too much memory, but we are not memory-constrained on most of our systems so it is fine. | |
| // static readonly int PoolSize = Environment.ProcessorCount * 4; | |
| // sealed class ChannelPool(UnboundedChannelOptions opts) | |
| // : DefaultObjectPool<Channel<T>>(new PooledPolicy(opts), PoolSize); | |
| // sealed class PooledPolicy(UnboundedChannelOptions opts) : IPooledObjectPolicy<Channel<T>> | |
| // { | |
| // public Channel<T> Create() => Channel.CreateUnbounded<T>(opts); | |
| // public bool Return(Channel<T> obj) | |
| // { | |
| // if (obj.Reader.Count > 0) | |
| // { | |
| // // We don't want to return a channel that has items in it. | |
| // obj.Writer.TryComplete(); | |
| // return false; | |
| // } | |
| // return true; | |
| // } | |
| // } | |
| public static async Task Listener( | |
| ChannelReader<T> reader, | |
| Action<T> callback, | |
| CancellationToken ct) | |
| { | |
| await Task.Yield(); | |
| while (await reader.WaitToReadAsync(ct).ConfigureAwait(false)) | |
| { | |
| while (reader.TryRead(out var item)) | |
| { | |
| callback(item); | |
| } | |
| } | |
| } | |
| public static async Task Listener( | |
| ChannelReader<T> reader, | |
| Func<T, CancellationToken, ValueTask> callback, | |
| CancellationToken ct) | |
| { | |
| await Task.Yield(); | |
| while (await reader.WaitToReadAsync(ct).ConfigureAwait(false)) | |
| { | |
| while (reader.TryRead(out var item)) | |
| { | |
| await callback(item, ct).ConfigureAwait(false); | |
| } | |
| } | |
| } | |
| } | |
| /// <summary> | |
| /// Represents a channel that supports synchronous writing and asynchronous reading. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the channel.</typeparam> | |
| public readonly struct Ch<T> : IAsyncEnumerable<T>, IDisposable | |
| { | |
| /// <summary> | |
| /// The underlying channel instance. | |
| /// </summary> | |
| public Channel<T> Value { get; } | |
| internal Ch(ChRelaxations relaxations) | |
| { | |
| var opts = new UnboundedChannelOptions | |
| { | |
| SingleReader = relaxations.HasFlag(ChRelaxations.SingleReader), | |
| SingleWriter = relaxations.HasFlag(ChRelaxations.SingleWriter) | |
| }; | |
| Value = Channel.CreateUnbounded<T>(opts); | |
| } | |
| internal Ch(int capacity, ChFullMode mode, Action<T>? dropped, ChRelaxations relaxations) | |
| { | |
| var opts = new BoundedChannelOptions(capacity) | |
| { | |
| FullMode = mode switch | |
| { | |
| ChFullMode.DropOldest => BoundedChannelFullMode.DropOldest, | |
| ChFullMode.DropNewest => BoundedChannelFullMode.DropNewest, | |
| ChFullMode.DropWrite => BoundedChannelFullMode.DropWrite, | |
| _ => throw new ArgumentOutOfRangeException(nameof(mode), mode, null) | |
| }, | |
| SingleReader = relaxations.HasFlag(ChRelaxations.SingleReader), | |
| SingleWriter = relaxations.HasFlag(ChRelaxations.SingleWriter) | |
| }; | |
| Value = Channel.CreateBounded(opts, dropped); | |
| } | |
| /// <summary> | |
| /// Writes an item to the channel. | |
| /// </summary> | |
| /// <param name="item">The item to write to the channel.</param> | |
| /// <exception cref="InvalidOperationException">Thrown if the channel is closed.</exception> | |
| public void Write(T item) | |
| { | |
| if (!Value.Writer.TryWrite(item)) | |
| InvalidOperation("Cannot write to a closed channel."); | |
| } | |
| /// <summary> | |
| /// Writes a range of items to the channel. | |
| /// </summary> | |
| /// <param name="items">The items to write to the channel.</param> | |
| /// <exception cref="InvalidOperationException">Thrown if the channel is closed.</exception> | |
| public void WriteRange(ReadOnlySpan<T> items) | |
| { | |
| foreach (var item in items) | |
| { | |
| if (!Value.Writer.TryWrite(item)) | |
| InvalidOperation("Cannot write to a closed channel."); | |
| } | |
| } | |
| /// <summary> | |
| /// Asynchronously reads an item from the channel. | |
| /// </summary> | |
| /// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the read operation.</param> | |
| /// <returns>A task that represents the asynchronous read operation and wraps the item read from the channel.</returns> | |
| public ValueTask<T> Read(CancellationToken ct = default) => Value.Reader.ReadAsync(ct); | |
| /// <summary> | |
| /// Gets an enumerator that asynchronously iterates through the channel. | |
| /// </summary> | |
| /// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the enumeration.</param> | |
| /// <returns>An enumerator of <typeparamref name="T"/> for consuming the channel.</returns> | |
| [EditorBrowsable(EditorBrowsableState.Never)] | |
| public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default) | |
| { | |
| return Value.Reader.ReadAllAsync(ct).GetAsyncEnumerator(ct); | |
| } | |
| /// <summary> | |
| /// Provides a convenient syntax for writing to a channel. | |
| /// </summary> | |
| /// <param name="item">The item to write to the channel.</param> | |
| public void operator <<=(T item) => Write(item); | |
| /// <summary> | |
| /// Provides a convenient syntax for writing a range of items to a channel. | |
| /// </summary> | |
| /// <param name="items">The items to write to the channel.</param> | |
| public void operator <<=(ReadOnlySpan<T> items) => WriteRange(items); | |
| /// <summary> | |
| /// Marks the channel as complete, meaning no more items will be written to it. | |
| /// </summary> | |
| public void Dispose() | |
| { | |
| Value.Writer.TryComplete(); | |
| } | |
| [DoesNotReturn, StackTraceHidden] | |
| static void InvalidOperation(string message) => throw new InvalidOperationException(message); | |
| } | |
| /// <summary> | |
| /// Represents a bounded channel that supports asynchronous reading and writing. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the channel.</typeparam> | |
| public readonly struct AsyncCh<T> : IAsyncEnumerable<T>, IDisposable | |
| { | |
| /// <summary> | |
| /// The underlying channel instance. | |
| /// </summary> | |
| public Channel<T> Value { get; } | |
| internal AsyncCh(int capacity, ChRelaxations relaxations = ChRelaxations.None) | |
| { | |
| var opts = new BoundedChannelOptions(capacity) | |
| { | |
| SingleReader = relaxations.HasFlag(ChRelaxations.SingleReader), | |
| SingleWriter = relaxations.HasFlag(ChRelaxations.SingleWriter) | |
| }; | |
| Value = Channel.CreateBounded<T>(opts); | |
| } | |
| /// <summary> | |
| /// Writes an item to the channel, asynchronously blocking if the channel is full. | |
| /// </summary> | |
| /// <param name="item">The item to write to the channel.</param> | |
| /// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the write operation.</param> | |
| /// <returns>A task that represents the asynchronous write operation.</returns> | |
| public ValueTask Write(T item, CancellationToken ct = default) => Value.Writer.WriteAsync(item, ct); | |
| /// <summary> | |
| /// Writes a range of items to the channel, asynchronously blocking if the channel is full. | |
| /// </summary> | |
| /// <param name="items">The items to write to the channel.</param> | |
| /// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the write operation.</param> | |
| /// <returns>A task that represents the asynchronous write operation.</returns> | |
| [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] | |
| public async ValueTask WriteRange(ReadOnlyMemory<T> items, CancellationToken ct = default) | |
| { | |
| while (items.Length > 0 && | |
| await Value.Writer.WaitToWriteAsync(ct).ConfigureAwait(false)) | |
| { | |
| var written = 0; | |
| foreach (var item in items.Span) | |
| { | |
| if (!Value.Writer.TryWrite(item)) | |
| break; | |
| written++; | |
| } | |
| items = items[written..]; | |
| } | |
| } | |
| /// <summary> | |
| /// Asynchronously reads an item from the channel. | |
| /// </summary> | |
| /// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the read operation.</param> | |
| /// <returns>A task that represents the asynchronous read operation and wraps the item read from the channel.</returns> | |
| public ValueTask<T> Read(CancellationToken ct = default) => Value.Reader.ReadAsync(ct); | |
| /// <summary> | |
| /// Gets an enumerator that asynchronously iterates through the channel. | |
| /// </summary> | |
| /// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the enumeration.</param> | |
| /// <returns>An enumerator of <typeparamref name="T"/> for consuming the channel.</returns> | |
| [EditorBrowsable(EditorBrowsableState.Never)] | |
| public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default) | |
| { | |
| return Value.Reader.ReadAllAsync(ct).GetAsyncEnumerator(ct); | |
| } | |
| /// <summary> | |
| /// Provides a convenient syntax for writing to a channel. | |
| /// </summary> | |
| /// <param name="ch">The channel to write to.</param> | |
| /// <param name="item">The item to write to the channel.</param> | |
| /// <returns>A task that represents the asynchronous write operation.</returns> | |
| public static ValueTask operator <<(AsyncCh<T> ch, T item) => ch.Write(item); | |
| /// <summary> | |
| /// Provides a convenient syntax for writing a range of items to a channel. | |
| /// </summary> | |
| /// <param name="ch">The channel to write to.</param> | |
| /// <param name="items">The items to write to the channel.</param> | |
| /// <returns>A task that represents the asynchronous write operation.</returns> | |
| public static ValueTask operator <<(AsyncCh<T> ch, ReadOnlyMemory<T> items) => ch.WriteRange(items); | |
| /// <summary> | |
| /// Marks the channel as complete, meaning no more items will be written to it. | |
| /// </summary> | |
| public void Dispose() => Value.Writer.TryComplete(); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /// <summary> | |
| /// Provides factory methods for creating mailboxes with integrated listening capability. | |
| /// </summary> | |
| public static class Mailbox | |
| { | |
| /// <summary> | |
| /// Creates a bounded mailbox with the specified capacity that will asynchronously block when full. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the mailbox.</typeparam> | |
| /// <param name="capacity">The maximum number of items the mailbox can hold.</param> | |
| /// <param name="callback">The callback to invoke for each received item.</param> | |
| /// <param name="readers">The number of readers that will be processing items.</param> | |
| /// <returns>A new bounded mailbox with integrated listener.</returns> | |
| /// <remarks> | |
| /// This operation produces approximately 800B of allocations plus listener task overhead. | |
| /// The mailbox will automatically start listening when created. | |
| /// </remarks> | |
| public static AsyncMailbox<T> Bounded<T>(int capacity, Action<T> callback, int readers = 1) | |
| { | |
| return new(capacity, callback, readers); | |
| } | |
| /// <summary> | |
| /// Creates a bounded mailbox with the specified capacity that will asynchronously block when full. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the mailbox.</typeparam> | |
| /// <param name="capacity">The maximum number of items the mailbox can hold.</param> | |
| /// <param name="callback">The asynchronous callback to invoke for each received item.</param> | |
| /// <param name="readers">The number of readers that will be processing items.</param> | |
| /// <returns>A new bounded mailbox with integrated listener.</returns> | |
| /// <remarks> | |
| /// The mailbox will automatically start listening when created. | |
| /// </remarks> | |
| public static AsyncMailbox<T> Bounded<T>( | |
| int capacity, | |
| Func<T, CancellationToken, ValueTask> callback, | |
| int readers = 1) | |
| { | |
| return new(capacity, callback, readers); | |
| } | |
| /// <summary> | |
| /// Creates a bounded mailbox with the specified capacity and behavior when the mailbox is full. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the mailbox.</typeparam> | |
| /// <param name="capacity">The maximum number of items the mailbox can hold.</param> | |
| /// <param name="mode">The behavior when the mailbox is full.</param> | |
| /// <param name="callback">The callback to invoke for each received item.</param> | |
| /// <param name="readers">The number of readers that will be processing items.</param> | |
| /// <param name="dropped">Optional callback invoked when an item is dropped.</param> | |
| /// <returns>A new bounded mailbox with the specified full mode behavior.</returns> | |
| /// <remarks> | |
| /// This operation produces approximately 800B of allocations plus listener task overhead. | |
| /// The mailbox will automatically start listening when created. | |
| /// </remarks> | |
| public static Mailbox<T> Bounded<T>( | |
| int capacity, | |
| ChFullMode mode, | |
| Action<T> callback, | |
| int readers = 1, | |
| Action<T>? dropped = null) | |
| { | |
| return new(capacity, mode, callback, readers, dropped); | |
| } | |
| /// <summary> | |
| /// Creates a bounded mailbox with the specified capacity and behavior when the mailbox is full. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the mailbox.</typeparam> | |
| /// <param name="capacity">The maximum number of items the mailbox can hold.</param> | |
| /// <param name="mode">The behavior when the mailbox is full.</param> | |
| /// <param name="callback">The asynchronous callback to invoke for each received item.</param> | |
| /// <param name="readers">The number of readers that will be processing items.</param> | |
| /// <param name="dropped">Optional callback invoked when an item is dropped.</param> | |
| /// <returns>A new bounded mailbox with the specified full mode behavior.</returns> | |
| /// <remarks> | |
| /// The mailbox will automatically start listening when created. | |
| /// </remarks> | |
| public static Mailbox<T> Bounded<T>( | |
| int capacity, | |
| ChFullMode mode, | |
| Func<T, CancellationToken, ValueTask> callback, | |
| int readers = 1, | |
| Action<T>? dropped = null) | |
| { | |
| return new(capacity, mode, callback, readers, dropped); | |
| } | |
| /// <summary> | |
| /// Creates an unbounded mailbox that can grow without limits. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the mailbox.</typeparam> | |
| /// <param name="callback">The callback to invoke for each received item.</param> | |
| /// <param name="readers">The number of readers that will be processing items.</param> | |
| /// <returns>A new unbounded mailbox.</returns> | |
| /// <remarks> | |
| /// The mailbox will automatically start listening when created. | |
| /// </remarks> | |
| public static Mailbox<T> Unbounded<T>(Action<T> callback, int readers = 1) | |
| { | |
| return new(callback, readers); | |
| } | |
| /// <summary> | |
| /// Creates an unbounded mailbox that can grow without limits. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the mailbox.</typeparam> | |
| /// <param name="callback">The asynchronous callback to invoke for each received item.</param> | |
| /// <param name="readers">The number of readers that will be processing items.</param> | |
| /// <returns>A new unbounded mailbox.</returns> | |
| /// <remarks> | |
| /// The mailbox will automatically start listening when created. | |
| /// </remarks> | |
| public static Mailbox<T> Unbounded<T>(Func<T, CancellationToken, ValueTask> callback, int readers = 1) | |
| { | |
| return new(callback, readers); | |
| } | |
| } | |
| /// <summary> | |
| /// Represents a mailbox that supports synchronous writing with integrated listening capability. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the mailbox.</typeparam> | |
| public sealed class Mailbox<T> : IAsyncDisposable | |
| { | |
| readonly Ch<T> channel; | |
| readonly CancellationTokenSource cancellation; | |
| readonly Task[] readers; | |
| volatile bool disposed; | |
| /// <summary> | |
| /// Gets the number of items waiting to be processed in the mailbox. | |
| /// </summary> | |
| public int UnreadCount => channel.Value.Reader.Count; | |
| internal Mailbox(Action<T> callback, int readerCount) | |
| { | |
| ArgumentNullException.ThrowIfNull(callback); | |
| ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1); | |
| var relaxations = readerCount == 1 | |
| ? ChRelaxations.SingleReader | |
| : ChRelaxations.None; | |
| channel = Ch.Unbounded<T>(relaxations); | |
| cancellation = new CancellationTokenSource(); | |
| readers = StartReaders(callback, readerCount, cancellation.Token); | |
| } | |
| internal Mailbox(Func<T, CancellationToken, ValueTask> callback, int readerCount) | |
| { | |
| ArgumentNullException.ThrowIfNull(callback); | |
| ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1); | |
| var relaxations = readerCount == 1 | |
| ? ChRelaxations.SingleReader | |
| : ChRelaxations.None; | |
| channel = Ch.Unbounded<T>(relaxations); | |
| cancellation = new CancellationTokenSource(); | |
| readers = StartReaders(callback, readerCount, cancellation.Token); | |
| } | |
| internal Mailbox( | |
| int capacity, | |
| ChFullMode mode, | |
| Action<T> callback, | |
| int readerCount, | |
| Action<T>? dropped) | |
| { | |
| ArgumentNullException.ThrowIfNull(callback); | |
| ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1); | |
| var relaxations = readerCount == 1 | |
| ? ChRelaxations.SingleReader | |
| : ChRelaxations.None; | |
| channel = Ch.Bounded(capacity, mode, dropped, relaxations); | |
| cancellation = new CancellationTokenSource(); | |
| readers = StartReaders(callback, readerCount, cancellation.Token); | |
| } | |
| internal Mailbox( | |
| int capacity, | |
| ChFullMode mode, | |
| Func<T, CancellationToken, ValueTask> callback, | |
| int readerCount, | |
| Action<T>? dropped) | |
| { | |
| ArgumentNullException.ThrowIfNull(callback); | |
| ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1); | |
| var relaxations = readerCount == 1 | |
| ? ChRelaxations.SingleReader | |
| : ChRelaxations.None; | |
| channel = Ch.Bounded(capacity, mode, dropped, relaxations); | |
| cancellation = new CancellationTokenSource(); | |
| readers = StartReaders(callback, readerCount, cancellation.Token); | |
| } | |
| Task[] StartReaders(Action<T> callback, int readerCount, CancellationToken ct) | |
| { | |
| var tasks = new Task[readerCount]; | |
| for (var i = 0; i < readerCount; i++) | |
| { | |
| tasks[i] = ChStatics<T>.Listener(channel.Value.Reader, callback, ct); | |
| } | |
| return tasks; | |
| } | |
| Task[] StartReaders(Func<T, CancellationToken, ValueTask> callback, int readerCount, CancellationToken ct) | |
| { | |
| var tasks = new Task[readerCount]; | |
| for (var i = 0; i < readerCount; i++) | |
| { | |
| tasks[i] = ChStatics<T>.Listener(channel.Value.Reader, callback, ct); | |
| } | |
| return tasks; | |
| } | |
| /// <summary> | |
| /// Writes an item to the mailbox. | |
| /// </summary> | |
| /// <param name="item">The item to write to the mailbox.</param> | |
| /// <exception cref="InvalidOperationException">Thrown if the mailbox is closed.</exception> | |
| public void Post(T item) | |
| { | |
| ObjectDisposedException.ThrowIf(disposed, this); | |
| channel.Write(item); | |
| } | |
| /// <summary> | |
| /// Writes a range of items to the mailbox. | |
| /// </summary> | |
| /// <param name="items">The items to write to the mailbox.</param> | |
| /// <exception cref="InvalidOperationException">Thrown if the mailbox is closed.</exception> | |
| public void PostRange(ReadOnlySpan<T> items) | |
| { | |
| ObjectDisposedException.ThrowIf(disposed, this); | |
| channel.WriteRange(items); | |
| } | |
| /// <summary> | |
| /// Provides a convenient syntax for posting to a mailbox. | |
| /// </summary> | |
| /// <param name="item">The item to post to the mailbox.</param> | |
| public void operator <<=(T item) => Post(item); | |
| /// <summary> | |
| /// Provides a convenient syntax for posting a range of items to a mailbox. | |
| /// </summary> | |
| /// <param name="items">The items to post to the mailbox.</param> | |
| public void operator <<=(ReadOnlySpan<T> items) => PostRange(items); | |
| /// <summary> | |
| /// Stops the readers, closes the channel and disposes all resources asynchronously. | |
| /// </summary> | |
| public async ValueTask DisposeAsync() | |
| { | |
| if (disposed) | |
| return; | |
| disposed = true; | |
| // Signal cancellation to stop readers | |
| cancellation.Cancel(); | |
| // Wait for readers to complete and dispose resources | |
| try | |
| { | |
| await Task | |
| .WhenAll(readers) | |
| .WaitAsync(TimeSpan.FromSeconds(100)) | |
| .ConfigureAwait(false); | |
| } | |
| catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is OperationCanceledException)) | |
| { | |
| // Readers were cancelled, this is expected | |
| } | |
| catch (OperationCanceledException) | |
| { | |
| // Readers were cancelled, this is expected | |
| } | |
| finally | |
| { | |
| // Close the channel to prevent new items | |
| channel.Dispose(); | |
| cancellation.Dispose(); | |
| } | |
| } | |
| } | |
| /// <summary> | |
| /// Represents a bounded mailbox that supports asynchronous reading and writing with integrated listening capability. | |
| /// </summary> | |
| /// <typeparam name="T">The type of elements in the mailbox.</typeparam> | |
| public sealed class AsyncMailbox<T> : IAsyncDisposable | |
| { | |
| readonly AsyncCh<T> channel; | |
| readonly CancellationTokenSource cancellation; | |
| readonly Task[] readers; | |
| volatile bool disposed; | |
| /// <summary> | |
| /// Gets the number of items waiting to be processed in the mailbox. | |
| /// </summary> | |
| public int UnreadCount => channel.Value.Reader.Count; | |
| internal AsyncMailbox(int capacity, Action<T> callback, int readerCount) | |
| { | |
| ArgumentNullException.ThrowIfNull(callback); | |
| ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1); | |
| var relaxations = readerCount == 1 | |
| ? ChRelaxations.SingleReader | |
| : ChRelaxations.None; | |
| channel = Ch.Bounded<T>(capacity, relaxations); | |
| cancellation = new CancellationTokenSource(); | |
| readers = StartReaders(callback, readerCount, cancellation.Token); | |
| } | |
| internal AsyncMailbox(int capacity, Func<T, CancellationToken, ValueTask> callback, int readerCount) | |
| { | |
| ArgumentNullException.ThrowIfNull(callback); | |
| ArgumentOutOfRangeException.ThrowIfLessThan(readerCount, 1); | |
| var relaxations = readerCount == 1 | |
| ? ChRelaxations.SingleReader | |
| : ChRelaxations.None; | |
| channel = Ch.Bounded<T>(capacity, relaxations); | |
| cancellation = new CancellationTokenSource(); | |
| readers = StartReaders(callback, readerCount, cancellation.Token); | |
| } | |
| Task[] StartReaders(Action<T> callback, int readerCount, CancellationToken ct) | |
| { | |
| var tasks = new Task[readerCount]; | |
| for (var i = 0; i < readerCount; i++) | |
| { | |
| tasks[i] = ChStatics<T>.Listener(channel.Value.Reader, callback, ct); | |
| } | |
| return tasks; | |
| } | |
| Task[] StartReaders(Func<T, CancellationToken, ValueTask> callback, int readerCount, CancellationToken ct) | |
| { | |
| var tasks = new Task[readerCount]; | |
| for (var i = 0; i < readerCount; i++) | |
| { | |
| tasks[i] = ChStatics<T>.Listener(channel.Value.Reader, callback, ct); | |
| } | |
| return tasks; | |
| } | |
| /// <summary> | |
| /// Writes an item to the mailbox, asynchronously blocking if the mailbox is full. | |
| /// </summary> | |
| /// <param name="item">The item to write to the mailbox.</param> | |
| /// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the write operation.</param> | |
| /// <returns>A task that represents the asynchronous write operation.</returns> | |
| public ValueTask Post(T item, CancellationToken ct = default) | |
| { | |
| ObjectDisposedException.ThrowIf(disposed, this); | |
| return channel.Write(item, ct); | |
| } | |
| /// <summary> | |
| /// Writes a range of items to the mailbox, asynchronously blocking if the mailbox is full. | |
| /// </summary> | |
| /// <param name="items">The items to write to the mailbox.</param> | |
| /// <param name="ct">A <see cref="CancellationToken"/> that can be used to cancel the write operation.</param> | |
| /// <returns>A task that represents the asynchronous write operation.</returns> | |
| public ValueTask PostRange(ReadOnlyMemory<T> items, CancellationToken ct = default) | |
| { | |
| ObjectDisposedException.ThrowIf(disposed, this); | |
| return channel.WriteRange(items, ct); | |
| } | |
| /// <summary> | |
| /// Provides a convenient syntax for posting to a mailbox. | |
| /// </summary> | |
| /// <param name="mailbox">The mailbox to post to.</param> | |
| /// <param name="item">The item to post to the mailbox.</param> | |
| /// <returns>A task that represents the asynchronous write operation.</returns> | |
| public static ValueTask operator <<(AsyncMailbox<T> mailbox, T item) => mailbox.Post(item); | |
| /// <summary> | |
| /// Provides a convenient syntax for posting a range of items to a mailbox. | |
| /// </summary> | |
| /// <param name="mailbox">The mailbox to post to.</param> | |
| /// <param name="items">The items to post to the mailbox.</param> | |
| /// <returns>A task that represents the asynchronous write operation.</returns> | |
| public static ValueTask operator <<(AsyncMailbox<T> mailbox, ReadOnlyMemory<T> items) => mailbox.PostRange(items); | |
| /// <summary> | |
| /// Stops the readers, closes the channel and disposes all resources asynchronously. | |
| /// </summary> | |
| public async ValueTask DisposeAsync() | |
| { | |
| if (disposed) | |
| return; | |
| disposed = true; | |
| // Signal cancellation to stop readers | |
| cancellation.Cancel(); | |
| // Wait for readers to complete and dispose resources | |
| try | |
| { | |
| await Task | |
| .WhenAll(readers) | |
| .WaitAsync(TimeSpan.FromSeconds(100)) | |
| .ConfigureAwait(false); | |
| } | |
| catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is OperationCanceledException)) | |
| { | |
| // Readers were cancelled, this is expected | |
| } | |
| catch (OperationCanceledException) | |
| { | |
| // Readers were cancelled, this is expected | |
| } | |
| finally | |
| { | |
| // Close the channel to prevent new items | |
| channel.Dispose(); | |
| cancellation.Dispose(); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment