Last active
October 10, 2024 20:00
-
-
Save egil/c517eba3aacb60777e629eff4743c80a to your computer and use it in GitHub Desktop.
A multi-cast IAsyncEnumerable<T>, where each reader/subscriber/iterator gets their own copy of the items passed to the enumerable, and can process them asynchronously at their own pace. Since it implements `IAsyncEnumerable<T>`, users can use all the LINQ style operators available in the System.Linq.Async package for easy filtering, projecting, …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Represents a multi-cast <see cref="IAsyncEnumerable{T}"/> where | |
/// each reader can consume the <typeparamref name="T"/> items | |
/// at its own pace. | |
/// </summary> | |
/// <typeparam name="T">The item type produced by the enumerable.</typeparam> | |
public sealed class MulticastAsyncEnumerable<T> : IAsyncEnumerable<T> | |
{ | |
private readonly UnboundedChannelOptions channelOptions; | |
private readonly object activeChannelsLock = new object(); | |
private ImmutableArray<Channel<T>> activeChannels = ImmutableArray<Channel<T>>.Empty; | |
public MulticastAsyncEnumerable() | |
{ | |
channelOptions = new() | |
{ | |
AllowSynchronousContinuations = false, | |
SingleReader = false, | |
SingleWriter = true, | |
}; | |
} | |
/// <summary> | |
/// Writes the <paramref name="item"/> to any readers. | |
/// </summary> | |
/// <param name="item">The item to write.</param> | |
public void Write(T item) | |
{ | |
foreach (var channel in activeChannels) | |
{ | |
channel.Writer.TryWrite(item); | |
} | |
} | |
/// <summary> | |
/// Mark all <see cref="IAsyncEnumerable{T}"/> streams | |
/// as completed. | |
/// </summary> | |
public void Complete() | |
{ | |
var channels = activeChannels; | |
lock (activeChannelsLock) | |
{ | |
activeChannels = ImmutableArray<Channel<T>>.Empty; | |
} | |
foreach (var channel in channels) | |
{ | |
channel.Writer.TryComplete(); | |
} | |
} | |
public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) | |
{ | |
var reader = Subscribe(); | |
try | |
{ | |
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) | |
{ | |
while (reader.TryRead(out T? item)) | |
{ | |
yield return item; | |
} | |
} | |
} | |
finally | |
{ | |
Unsubscribe(reader); | |
} | |
} | |
private ChannelReader<T> Subscribe() | |
{ | |
var channel = Channel.CreateUnbounded<T>(channelOptions); | |
lock (activeChannelsLock) | |
{ | |
activeChannels = activeChannels.Add(channel); | |
} | |
return channel.Reader; | |
} | |
private void Unsubscribe(ChannelReader<T> reader) | |
{ | |
if (activeChannels.FirstOrDefault(x => ReferenceEquals(x.Reader, reader)) is Channel<T> channel) | |
{ | |
lock (activeChannelsLock) | |
{ | |
activeChannels = activeChannels.Remove(channel); | |
} | |
channel.Writer.TryComplete(); | |
} | |
} | |
} |
@agehrke, no, still didn't get it to work with Interlocked.Exchange. However, have updated the code to use a regular lock around the bits that update the activeChannels
. My usage/assumption is that this is going to see many many more calls to the Write
method than to the others, so it's not going to hurt perf.
Oh dooh. Yes, the lock looks good.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@agehrke thanks. I actually tried that briefly with
Interlocked.Exchange
but could not get it to compile becauseImmutableArray
is a struct.Perhaps I didn't hold it right.