Skip to content

Instantly share code, notes, and snippets.

@neuecc
Created November 30, 2020 05:08
Show Gist options
  • Save neuecc/b739f6355148d81c862ebfbf14ab7165 to your computer and use it in GitHub Desktop.
Save neuecc/b739f6355148d81c862ebfbf14ab7165 to your computer and use it in GitHub Desktop.
public static class UniTaskObservableExtensions
{
public static IObservable<TResult> SelectManyUniTask<TSource, TResult>(this IObservable<TSource> observable,
Func<TSource, IObserver<TResult>, CancellationToken, UniTask> observeAsync)
{
return new SelectManyUniTaskObservable<TSource, TResult>(observable, observeAsync);
}
class SelectManyUniTaskObservable<TSource, TResult> : IObservable<TResult>
{
readonly IObservable<TSource> source;
readonly Func<TSource, IObserver<TResult>, CancellationToken, UniTask> observeAsync;
public SelectManyUniTaskObservable(IObservable<TSource> source, Func<TSource, IObserver<TResult>, CancellationToken, UniTask> observeAsync)
{
this.source = source;
this.observeAsync = observeAsync;
}
public IDisposable Subscribe(IObserver<TResult> observer)
{
var cts = new CancellationTokenSource();
var newObserver = new Observer(this, observer, cts);
var disposable = source.Subscribe(newObserver);
newObserver.SourceDisposable = disposable;
return newObserver;
}
class Observer : IObserver<TSource>, IDisposable
{
readonly SelectManyUniTaskObservable<TSource, TResult> parent;
readonly IObserver<TResult> observer;
readonly CancellationTokenSource tokenSource;
public IDisposable SourceDisposable { get; set; }
public Observer(SelectManyUniTaskObservable<TSource, TResult> parent, IObserver<TResult> observer, CancellationTokenSource tokenSource)
{
this.parent = parent;
this.observer = observer;
this.tokenSource = tokenSource;
}
public void OnNext(TSource value)
{
var task = parent.observeAsync(value, observer, tokenSource.Token);
RunTaskCore(task).Forget();
}
async UniTaskVoid RunTaskCore(UniTask task)
{
try
{
await task;
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
}
public void OnError(Exception error)
{
observer.OnError(error);
}
public void OnCompleted()
{
observer.OnCompleted();
}
public void Dispose()
{
try
{
tokenSource.Cancel();
tokenSource.Dispose();
}
finally
{
if (SourceDisposable != null)
{
SourceDisposable.Dispose();
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment