Skip to content

Instantly share code, notes, and snippets.

@Tirael
Created November 1, 2024 14:22
Show Gist options
  • Save Tirael/ff5bf7c6f20dfc04f5ca9551cf1edbf5 to your computer and use it in GitHub Desktop.
Save Tirael/ff5bf7c6f20dfc04f5ca9551cf1edbf5 to your computer and use it in GitHub Desktop.
SubscribeAsync
// https://github.com/dotnet/reactive/issues/459
// https://github.com/dotnet/reactive/issues/459#issuecomment-555607144
/*
The difference is that Observable.FromAsync is deferred, while .ToObservable is not; i.e. the latter will execute the given async function immediately once a new item is being yielded from the observable.
This difference is due to the fact that the parameter of Observable.FromAsync(...) is a Func<Task>, while the parameter of .ToObservable is a Task; i.e. the former is a not-yet-started asynchronous operation which can be started when it is desired, while the latter is an already-started asynchronous operation.
It means that when you create an observable by using Observable.FromAsync(...) the subscriber can control the starting of the asynchronous operation, while by using .ToObservable the subscriber cannot control the starting of the asynchronous operation because that operation has already started.
You can check this behavior by running the following example:
*/
void Main()
{
Observable.Interval(TimeSpan.FromSeconds(1))
.SubscribeAsync(number => DoSomeWorkAsync(number));
}
async Task DoSomeWorkAsync(long number)
{
Console.WriteLine($"DoSomeWorkAsync BEGIN '{number}'");
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine($"DoSomeWorkAsync END '{number}'");
}
#if true
public static class MyObservableExtensions
{
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNextAsync) =>
source
.Select(number => Observable.FromAsync(() => onNextAsync(number)))
.Concat()
.Subscribe();
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync) =>
source
.Select(number => Observable.FromAsync(() => onNextAsync(number)))
.Merge()
.Subscribe();
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync, int maxConcurrent) =>
source
.Select(number => Observable.FromAsync(() => onNextAsync(number)))
.Merge(maxConcurrent)
.Subscribe();
}
#else
public static class MyObservableExtensions
{
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNextAsync) =>
source
.Select(number => onNextAsync(number).ToObservable()) // note ToObservable instead of FromAsync!
.Concat()
.Subscribe();
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync) =>
source
.Select(number => onNextAsync(number).ToObservable()) // note ToObservable instead of FromAsync!
.Merge()
.Subscribe();
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync, int maxConcurrent) =>
source
.Select(number => onNextAsync(number).ToObservable()) // note ToObservable instead of FromAsync!
.Merge(maxConcurrent)
.Subscribe();
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment