Created
November 1, 2024 14:22
-
-
Save Tirael/ff5bf7c6f20dfc04f5ca9551cf1edbf5 to your computer and use it in GitHub Desktop.
SubscribeAsync
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://github.com/dotnet/reactive/issues/459 | |
// https://github.com/dotnet/reactive/issues/459#issuecomment-555607144 | |
/* | |
The difference is that Observable.FromAsync is deferred, while .ToObservable is not; i.e. the latter will execute the given async function immediately once a new item is being yielded from the observable. | |
This difference is due to the fact that the parameter of Observable.FromAsync(...) is a Func<Task>, while the parameter of .ToObservable is a Task; i.e. the former is a not-yet-started asynchronous operation which can be started when it is desired, while the latter is an already-started asynchronous operation. | |
It means that when you create an observable by using Observable.FromAsync(...) the subscriber can control the starting of the asynchronous operation, while by using .ToObservable the subscriber cannot control the starting of the asynchronous operation because that operation has already started. | |
You can check this behavior by running the following example: | |
*/ | |
void Main() | |
{ | |
Observable.Interval(TimeSpan.FromSeconds(1)) | |
.SubscribeAsync(number => DoSomeWorkAsync(number)); | |
} | |
async Task DoSomeWorkAsync(long number) | |
{ | |
Console.WriteLine($"DoSomeWorkAsync BEGIN '{number}'"); | |
await Task.Delay(TimeSpan.FromSeconds(3)); | |
Console.WriteLine($"DoSomeWorkAsync END '{number}'"); | |
} | |
#if true | |
public static class MyObservableExtensions | |
{ | |
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNextAsync) => | |
source | |
.Select(number => Observable.FromAsync(() => onNextAsync(number))) | |
.Concat() | |
.Subscribe(); | |
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync) => | |
source | |
.Select(number => Observable.FromAsync(() => onNextAsync(number))) | |
.Merge() | |
.Subscribe(); | |
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync, int maxConcurrent) => | |
source | |
.Select(number => Observable.FromAsync(() => onNextAsync(number))) | |
.Merge(maxConcurrent) | |
.Subscribe(); | |
} | |
#else | |
public static class MyObservableExtensions | |
{ | |
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNextAsync) => | |
source | |
.Select(number => onNextAsync(number).ToObservable()) // note ToObservable instead of FromAsync! | |
.Concat() | |
.Subscribe(); | |
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync) => | |
source | |
.Select(number => onNextAsync(number).ToObservable()) // note ToObservable instead of FromAsync! | |
.Merge() | |
.Subscribe(); | |
public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync, int maxConcurrent) => | |
source | |
.Select(number => onNextAsync(number).ToObservable()) // note ToObservable instead of FromAsync! | |
.Merge(maxConcurrent) | |
.Subscribe(); | |
} | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment