Created
November 30, 2020 05:08
-
-
Save neuecc/b739f6355148d81c862ebfbf14ab7165 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class UniTaskObservableExtensions | |
{ | |
public static IObservable<TResult> SelectManyUniTask<TSource, TResult>(this IObservable<TSource> observable, | |
Func<TSource, IObserver<TResult>, CancellationToken, UniTask> observeAsync) | |
{ | |
return new SelectManyUniTaskObservable<TSource, TResult>(observable, observeAsync); | |
} | |
class SelectManyUniTaskObservable<TSource, TResult> : IObservable<TResult> | |
{ | |
readonly IObservable<TSource> source; | |
readonly Func<TSource, IObserver<TResult>, CancellationToken, UniTask> observeAsync; | |
public SelectManyUniTaskObservable(IObservable<TSource> source, Func<TSource, IObserver<TResult>, CancellationToken, UniTask> observeAsync) | |
{ | |
this.source = source; | |
this.observeAsync = observeAsync; | |
} | |
public IDisposable Subscribe(IObserver<TResult> observer) | |
{ | |
var cts = new CancellationTokenSource(); | |
var newObserver = new Observer(this, observer, cts); | |
var disposable = source.Subscribe(newObserver); | |
newObserver.SourceDisposable = disposable; | |
return newObserver; | |
} | |
class Observer : IObserver<TSource>, IDisposable | |
{ | |
readonly SelectManyUniTaskObservable<TSource, TResult> parent; | |
readonly IObserver<TResult> observer; | |
readonly CancellationTokenSource tokenSource; | |
public IDisposable SourceDisposable { get; set; } | |
public Observer(SelectManyUniTaskObservable<TSource, TResult> parent, IObserver<TResult> observer, CancellationTokenSource tokenSource) | |
{ | |
this.parent = parent; | |
this.observer = observer; | |
this.tokenSource = tokenSource; | |
} | |
public void OnNext(TSource value) | |
{ | |
var task = parent.observeAsync(value, observer, tokenSource.Token); | |
RunTaskCore(task).Forget(); | |
} | |
async UniTaskVoid RunTaskCore(UniTask task) | |
{ | |
try | |
{ | |
await task; | |
} | |
catch (Exception ex) | |
{ | |
observer.OnError(ex); | |
return; | |
} | |
} | |
public void OnError(Exception error) | |
{ | |
observer.OnError(error); | |
} | |
public void OnCompleted() | |
{ | |
observer.OnCompleted(); | |
} | |
public void Dispose() | |
{ | |
try | |
{ | |
tokenSource.Cancel(); | |
tokenSource.Dispose(); | |
} | |
finally | |
{ | |
if (SourceDisposable != null) | |
{ | |
SourceDisposable.Dispose(); | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment