Skip to content

Instantly share code, notes, and snippets.

@fubar-coder
Last active November 1, 2024 16:14
Show Gist options
  • Save fubar-coder/003cc8f9457de103f38dcb6a3682515b to your computer and use it in GitHub Desktop.
Save fubar-coder/003cc8f9457de103f38dcb6a3682515b to your computer and use it in GitHub Desktop.
ChannelReader as IObservable
using System.Reactive.Linq;
using System.Threading.Channels;
public static class ChannelReaderExtensions
{
public static IObservable<T> ToObservable<T>(this ChannelReader<T> reader)
{
return Observable.Create<T>(async (observer, ct) =>
{
try
{
for (;;)
{
var item = await reader.ReadAsync(ct);
observer.OnNext(item);
}
}
catch (ChannelClosedException)
{
observer.OnCompleted();
}
catch (OperationCanceledException)
{
observer.OnCompleted();
}
catch (Exception exception)
{
observer.OnError(exception);
}
});
}
}

Intention

Wraps a ChannelReader<T> in an IObservable<T>

License

MIT

Usage

using System.Reactive.Linq;
using System.Threading.Channels;

var channel = Channel.CreateBounded<int>(1);

var reader = channel.Reader.ToObservable();

using (reader
           .Do(x => Console.WriteLine($"Received {x}"))
           .Subscribe())
{
    for (var i = 0; i != 1000; ++i)
    {
        Console.WriteLine($"Writing {i}");
        await channel.Writer.WriteAsync(i);
        Console.WriteLine($"Written {i}");
    }

    channel.Writer.Complete();

    Console.WriteLine("Press key to end");
    Console.ReadLine();
}

Console.WriteLine("Finished");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment