Last active
October 10, 2024 20:00
-
-
Save egil/c517eba3aacb60777e629eff4743c80a to your computer and use it in GitHub Desktop.
A multi-cast IAsyncEnumerable<T>, where each reader/subscriber/iterator gets their own copy of the items passed to the enumerable, and can process them asynchronously at their own pace. Since it implements `IAsyncEnumerable<T>`, users can use all the LINQ style operators available in the System.Linq.Async package for easy filtering, projecting, …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Represents a multi-cast <see cref="IAsyncEnumerable{T}"/> where | |
/// each reader can consume the <typeparamref name="T"/> items | |
/// at its own pace. | |
/// </summary> | |
/// <typeparam name="T">The item type produced by the enumerable.</typeparam> | |
public sealed class MulticastAsyncEnumerable<T> : IAsyncEnumerable<T> | |
{ | |
private readonly UnboundedChannelOptions channelOptions; | |
private readonly object activeChannelsLock = new object(); | |
private ImmutableArray<Channel<T>> activeChannels = ImmutableArray<Channel<T>>.Empty; | |
public MulticastAsyncEnumerable() | |
{ | |
channelOptions = new() | |
{ | |
AllowSynchronousContinuations = false, | |
SingleReader = false, | |
SingleWriter = true, | |
}; | |
} | |
/// <summary> | |
/// Writes the <paramref name="item"/> to any readers. | |
/// </summary> | |
/// <param name="item">The item to write.</param> | |
public void Write(T item) | |
{ | |
foreach (var channel in activeChannels) | |
{ | |
channel.Writer.TryWrite(item); | |
} | |
} | |
/// <summary> | |
/// Mark all <see cref="IAsyncEnumerable{T}"/> streams | |
/// as completed. | |
/// </summary> | |
public void Complete() | |
{ | |
var channels = activeChannels; | |
lock (activeChannelsLock) | |
{ | |
activeChannels = ImmutableArray<Channel<T>>.Empty; | |
} | |
foreach (var channel in channels) | |
{ | |
channel.Writer.TryComplete(); | |
} | |
} | |
public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) | |
{ | |
var reader = Subscribe(); | |
try | |
{ | |
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) | |
{ | |
while (reader.TryRead(out T? item)) | |
{ | |
yield return item; | |
} | |
} | |
} | |
finally | |
{ | |
Unsubscribe(reader); | |
} | |
} | |
private ChannelReader<T> Subscribe() | |
{ | |
var channel = Channel.CreateUnbounded<T>(channelOptions); | |
lock (activeChannelsLock) | |
{ | |
activeChannels = activeChannels.Add(channel); | |
} | |
return channel.Reader; | |
} | |
private void Unsubscribe(ChannelReader<T> reader) | |
{ | |
if (activeChannels.FirstOrDefault(x => ReferenceEquals(x.Reader, reader)) is Channel<T> channel) | |
{ | |
lock (activeChannelsLock) | |
{ | |
activeChannels = activeChannels.Remove(channel); | |
} | |
channel.Writer.TryComplete(); | |
} | |
} | |
} |
@agehrke thanks. I actually tried that briefly with Interlocked.Exchange
but could not get it to compile because ImmutableArray
is a struct.
Perhaps I didn't hold it right.
@agehrke, no, still didn't get it to work with Interlocked.Exchange. However, have updated the code to use a regular lock around the bits that update the activeChannels
. My usage/assumption is that this is going to see many many more calls to the Write
method than to the others, so it's not going to hurt perf.
Oh dooh. Yes, the lock looks good.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for a good thread on Twitter. I have been over this concept several times, and like you mentioned, Rx seems like a good fit, but I find it a too large dependency to take on if not already in library/service.
Maybe your implementation could use some Interlocked.CompareExchange() for setting activeChannels field to be truly “thread-safe”, as there could be concurrent invocations of eg Subscribe() and Unsubscribe().