Created
July 16, 2022 16:02
-
-
Save Fijo/419c83694a359cba8ddd9867d47f6ef7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive.Linq; | |
using System.ComponentModel; | |
namespace CosmicShores.Core.Rx { | |
public static class ObservableExtensions { | |
// PERF can be improved | |
// TODO propose integration this into System.Reactive itself. (https://github.com/dotnet/reactive/issues/1791#issuecomment-1186218797) | |
public static IObservable<IList<TSource>> CombineLatest<TSource>(this IEnumerable<IObservable<TSource>> sources, SourcelessBehaviour noSourceBehaviour) { | |
switch (noSourceBehaviour) { | |
case SourcelessBehaviour.SingleEmptyArray: | |
if (sources is null) | |
throw new ArgumentNullException(nameof(sources)); | |
// wrapping within Observable.Create so the IEnumerable isn't executed earlier than it normally would be. | |
return Observable.Create<IList<TSource>>(SingleEmptyArrayBehaviourSubscribe); | |
case SourcelessBehaviour.Default: | |
return sources.CombineLatest(); | |
default: | |
throw new InvalidEnumArgumentException(nameof(noSourceBehaviour), (int)noSourceBehaviour, typeof(SourcelessBehaviour)); | |
} | |
IDisposable SingleEmptyArrayBehaviourSubscribe(IObserver<IList<TSource>> observer) { | |
var sourceArray = sources.ToArray(); | |
var observable = sourceArray.Length == 0 | |
? Observable.Return(Array.Empty<TSource>()) | |
: sourceArray.CombineLatest(); | |
return observable.Subscribe(observer); | |
} | |
} | |
// PERF can be improved | |
public static IObservable<TResult> CombineLatest<TSource, TResult>(this IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector, SourcelessBehaviour noSourceBehaviour) { | |
if (resultSelector is null) throw new ArgumentNullException(nameof(resultSelector)); | |
return sources.CombineLatest(noSourceBehaviour).Select(resultSelector); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace CosmicShores.Core.Rx { | |
/// <summary> | |
/// Behaviour for emiting when there are no sources. | |
/// </summary> | |
public enum SourcelessBehaviour : byte { | |
/// <summary> | |
/// Never emits (as of System.Reactive 5.0) | |
/// </summary> | |
Default = 0, | |
SingleEmptyArray = 1, | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment