Skip to content

Instantly share code, notes, and snippets.

@egil
Last active October 10, 2024 20:00
Show Gist options
  • Save egil/c517eba3aacb60777e629eff4743c80a to your computer and use it in GitHub Desktop.
Save egil/c517eba3aacb60777e629eff4743c80a to your computer and use it in GitHub Desktop.
A multi-cast IAsyncEnumerable<T>, where each reader/subscriber/iterator gets their own copy of the items passed to the enumerable, and can process them asynchronously at their own pace. Since it implements `IAsyncEnumerable<T>`, users can use all the LINQ style operators available in the System.Linq.Async package for easy filtering, projecting, …
/// <summary>
/// Represents a multi-cast <see cref="IAsyncEnumerable{T}"/> where
/// each reader can consume the <typeparamref name="T"/> items
/// at its own pace.
/// </summary>
/// <typeparam name="T">The item type produced by the enumerable.</typeparam>
public sealed class MulticastAsyncEnumerable<T> : IAsyncEnumerable<T>
{
private readonly UnboundedChannelOptions channelOptions;
private readonly object activeChannelsLock = new object();
private ImmutableArray<Channel<T>> activeChannels = ImmutableArray<Channel<T>>.Empty;
public MulticastAsyncEnumerable()
{
channelOptions = new()
{
AllowSynchronousContinuations = false,
SingleReader = false,
SingleWriter = true,
};
}
/// <summary>
/// Writes the <paramref name="item"/> to any readers.
/// </summary>
/// <param name="item">The item to write.</param>
public void Write(T item)
{
foreach (var channel in activeChannels)
{
channel.Writer.TryWrite(item);
}
}
/// <summary>
/// Mark all <see cref="IAsyncEnumerable{T}"/> streams
/// as completed.
/// </summary>
public void Complete()
{
var channels = activeChannels;
lock (activeChannelsLock)
{
activeChannels = ImmutableArray<Channel<T>>.Empty;
}
foreach (var channel in channels)
{
channel.Writer.TryComplete();
}
}
public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
var reader = Subscribe();
try
{
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (reader.TryRead(out T? item))
{
yield return item;
}
}
}
finally
{
Unsubscribe(reader);
}
}
private ChannelReader<T> Subscribe()
{
var channel = Channel.CreateUnbounded<T>(channelOptions);
lock (activeChannelsLock)
{
activeChannels = activeChannels.Add(channel);
}
return channel.Reader;
}
private void Unsubscribe(ChannelReader<T> reader)
{
if (activeChannels.FirstOrDefault(x => ReferenceEquals(x.Reader, reader)) is Channel<T> channel)
{
lock (activeChannelsLock)
{
activeChannels = activeChannels.Remove(channel);
}
channel.Writer.TryComplete();
}
}
}
@agehrke
Copy link

agehrke commented Oct 15, 2022

Thanks for a good thread on Twitter. I have been over this concept several times, and like you mentioned, Rx seems like a good fit, but I find it a too large dependency to take on if not already in library/service.

Maybe your implementation could use some Interlocked.CompareExchange() for setting activeChannels field to be truly “thread-safe”, as there could be concurrent invocations of eg Subscribe() and Unsubscribe().

@egil
Copy link
Author

egil commented Oct 15, 2022

@agehrke thanks. I actually tried that briefly with Interlocked.Exchange but could not get it to compile because ImmutableArray is a struct.

Perhaps I didn't hold it right.

@egil
Copy link
Author

egil commented Oct 15, 2022

@agehrke, no, still didn't get it to work with Interlocked.Exchange. However, have updated the code to use a regular lock around the bits that update the activeChannels. My usage/assumption is that this is going to see many many more calls to the Write method than to the others, so it's not going to hurt perf.

@agehrke
Copy link

agehrke commented Oct 16, 2022

Oh dooh. Yes, the lock looks good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment