Last active
February 19, 2024 15:35
-
-
Save freak4pc/8d46ea6a6f5e5902c3fb5eba440a55c3 to your computer and use it in GitHub Desktop.
withLatestFrom for Apple's Combine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Combine+WithLatestFrom.swift | |
// | |
// Created by Shai Mishali on 29/08/2019. | |
// Copyright © 2019 Shai Mishali. All rights reserved. | |
// | |
import Combine | |
// MARK: - Operator methods | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
extension Publisher { | |
/// Merges two publishers into a single publisher by combining each value | |
/// from self with the latest value from the second publisher, if any. | |
/// | |
/// - parameter other: Second observable source. | |
/// - parameter resultSelector: Function to invoke for each value from the self combined | |
/// with the latest value from the second source, if any. | |
/// | |
/// - returns: A publisher containing the result of combining each value of the self | |
/// with the latest value from the second publisher, if any, using the | |
/// specified result selector function. | |
func withLatestFrom<Other: Publisher, Result>(_ other: Other, | |
resultSelector: @escaping (Output, Other.Output) -> Result) | |
-> Publishers.WithLatestFrom<Self, Other, Result> { | |
return .init(upstream: self, second: other, resultSelector: resultSelector) | |
} | |
/// Upon an emission from self, emit the latest value from the | |
/// second publisher, if any exists. | |
/// | |
/// - parameter other: Second observable source. | |
/// | |
/// - returns: A publisher containing the latest value from the second publisher, if any. | |
func withLatestFrom<Other: Publisher>(_ other: Other) | |
-> Publishers.WithLatestFrom<Self, Other, Other.Output> { | |
return .init(upstream: self, second: other) { $1 } | |
} | |
} | |
// MARK: - Publisher | |
extension Publishers { | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
public struct WithLatestFrom<Upstream: Publisher, | |
Other: Publisher, | |
Output>: Publisher where Upstream.Failure == Other.Failure { | |
public typealias Failure = Upstream.Failure | |
public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output | |
private let upstream: Upstream | |
private let second: Other | |
private let resultSelector: ResultSelector | |
private var latestValue: Other.Output? | |
init(upstream: Upstream, | |
second: Other, | |
resultSelector: @escaping ResultSelector) { | |
self.upstream = upstream | |
self.second = second | |
self.resultSelector = resultSelector | |
} | |
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input { | |
let sub = Subscription(upstream: upstream, | |
second: second, | |
resultSelector: resultSelector, | |
subscriber: subscriber) | |
subscriber.receive(subscription: sub) | |
} | |
} | |
} | |
// MARK: - Subscription | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
extension Publishers.WithLatestFrom { | |
private class Subscription<S: Subscriber>: Combine.Subscription where S.Input == Output, S.Failure == Failure { | |
private let subscriber: S | |
private let resultSelector: ResultSelector | |
private var latestValue: Other.Output? | |
private let upstream: Upstream | |
private let second: Other | |
private var firstSubscription: Cancellable? | |
private var secondSubscription: Cancellable? | |
init(upstream: Upstream, | |
second: Other, | |
resultSelector: @escaping ResultSelector, | |
subscriber: S) { | |
self.upstream = upstream | |
self.second = second | |
self.subscriber = subscriber | |
self.resultSelector = resultSelector | |
trackLatestFromSecond() | |
} | |
func request(_ demand: Subscribers.Demand) { | |
// withLatestFrom always takes one latest value from the second | |
// observable, so demand doesn't really have a meaning here. | |
firstSubscription = upstream | |
.sink( | |
receiveCompletion: { [subscriber] in subscriber.receive(completion: $0) }, | |
receiveValue: { [weak self] value in | |
guard let self = self else { return } | |
guard let latest = self.latestValue else { return } | |
_ = self.subscriber.receive(self.resultSelector(value, latest)) | |
}) | |
} | |
// Create an internal subscription to the `Other` publisher, | |
// constantly tracking its latest value | |
private func trackLatestFromSecond() { | |
let subscriber = AnySubscriber<Other.Output, Other.Failure>( | |
receiveSubscription: { [weak self] subscription in | |
self?.secondSubscription = subscription | |
subscription.request(.unlimited) | |
}, | |
receiveValue: { [weak self] value in | |
self?.latestValue = value | |
return .unlimited | |
}, | |
receiveCompletion: nil) | |
self.second.subscribe(subscriber) | |
} | |
func cancel() { | |
firstSubscription?.cancel() | |
secondSubscription?.cancel() | |
} | |
} | |
} |
Got it, thanks.
Here's a version composed out of built-in operators instead of rebuilding the publisher:
import Combine
import Dispatch
extension Publishers {
struct WithLatestFrom<Upstream: Publisher, Side: Publisher, Output>: Publisher where Upstream.Failure == Side.Failure {
typealias Failure = Side.Failure
typealias ResultSelector = (Upstream.Output, Side.Output) -> Output
let upstream: Upstream
let side: Side
let resultSelector: ResultSelector
func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let timestampedUpstream = upstream.map(TimestampedValue.init)
let timestampedSide = side.map(TimestampedValue.init)
let state = timestampedUpstream.combineLatest(timestampedSide, State.init)
let mappedValues = state
.filter { $0.upstream.time >= $0.side.time }
.map { resultSelector($0.upstream.value, $0.side.value) }
mappedValues.receive(subscriber: subscriber)
}
private struct TimestampedValue<T> {
let value: T
let time: DispatchTime
init(value: T) {
self.value = value
self.time = DispatchTime.now()
}
}
private struct State {
let upstream: TimestampedValue<Upstream.Output>
let side: TimestampedValue<Side.Output>
}
}
}
extension Publisher {
func withLatestFrom<P: Publisher, R>(_ publisher: P,
resultSelector: @escaping (Output, P.Output) -> R) -> Publishers.WithLatestFrom<Self, P, R> {
Publishers.WithLatestFrom(upstream: self, side: publisher, resultSelector: resultSelector)
}
func withLatestFrom<P: Publisher>(_ publisher: P) -> Publishers.WithLatestFrom<Self, P, P.Output> {
withLatestFrom(publisher, resultSelector: { $1 })
}
}
Thanks @pawlowskialex.
If you're going down that path, here's a neatly composed variation @jasdev wrote:
extension Publisher {
func withLatestFrom<Other: Publisher, Result>(_ other: Other,
resultSelector: @escaping (Output, Other.Output) -> Result)
-> AnyPublisher<Result, Failure>
where Other.Failure == Failure {
let upstream = share()
return other
.map { second in upstream.map { resultSelector($0, second) } }
.switchToLatest()
.zip(upstream) // `zip`ping and discarding `\.1` allows for
// upstream completions to be projected down immediately.
.map(\.0)
.eraseToAnyPublisher()
}
}
And a blot post he wrote about this topic: https://jasdev.me/notes/with-latest-from
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
As far as I'm concerned it's MIT. Feel free to use it.
It will be part of CombineCommunity/CombineExt, soon