Skip to content

Instantly share code, notes, and snippets.

@JakeGinnivan
Last active August 29, 2015 14:19
Show Gist options
  • Select an option

  • Save JakeGinnivan/4f2480544b726a7b770d to your computer and use it in GitHub Desktop.

Select an option

Save JakeGinnivan/4f2480544b726a7b770d to your computer and use it in GitHub Desktop.
WithLatestFrom
void Main()
{
var stream1 = Observable.Interval(TimeSpan.FromSeconds(5)).Select(_ => "a" + _);
var stream2 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => "b" + _);
stream1.WithLatestFrom(stream2, (a, b) => new {a, b}).DumpLatest(true);
}
public static class ObservableExtensions
{
public static IObservable<TResult> WithLatestFrom<TResult, TFirst, TSecond>(
this IObservable<TFirst> first,
IObservable<TSecond> second,
Func<TFirst, TSecond, TResult> resultSelector)
{
return Observable.Create<TResult>(obs => {
// Need this so when we switch we instantly replay the last value of first
var replay = first.Replay(1);
// Connect to make replay (or first) hot
var replaySubs = replay.Connect();
// Now we subscribe to second, not sure which way around you want this api to work.
var subs = second
.Select(a => replay.Select(b => new { a, b }))
.Switch()
.Select(val => resultSelector(val.b, val.a))
.Subscribe(obs);
return new CompositeDisposable(replaySubs, subs);
});
}
}
@khellang
Copy link
Copy Markdown

@james-world came up with an even shorter approach 😁

public static IObservable<TResult> WithLatestFrom<TResult, TFirst, TSecond>(
    this IObservable<TFirst> first,
    IObservable<TSecond> second,
    Func<TFirst, TSecond, TResult> resultSelector)
{
    return first.Publish(os => second.Select(a => os.Select(b => resultSelector(b, a))).Switch());
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment