-
-
Save freak4pc/8d46ea6a6f5e5902c3fb5eba440a55c3 to your computer and use it in GitHub Desktop.
// | |
// Combine+WithLatestFrom.swift | |
// | |
// Created by Shai Mishali on 29/08/2019. | |
// Copyright © 2019 Shai Mishali. All rights reserved. | |
// | |
import Combine | |
// MARK: - Operator methods | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
extension Publisher { | |
/// Merges two publishers into a single publisher by combining each value | |
/// from self with the latest value from the second publisher, if any. | |
/// | |
/// - parameter other: Second observable source. | |
/// - parameter resultSelector: Function to invoke for each value from the self combined | |
/// with the latest value from the second source, if any. | |
/// | |
/// - returns: A publisher containing the result of combining each value of the self | |
/// with the latest value from the second publisher, if any, using the | |
/// specified result selector function. | |
func withLatestFrom<Other: Publisher, Result>(_ other: Other, | |
resultSelector: @escaping (Output, Other.Output) -> Result) | |
-> Publishers.WithLatestFrom<Self, Other, Result> { | |
return .init(upstream: self, second: other, resultSelector: resultSelector) | |
} | |
/// Upon an emission from self, emit the latest value from the | |
/// second publisher, if any exists. | |
/// | |
/// - parameter other: Second observable source. | |
/// | |
/// - returns: A publisher containing the latest value from the second publisher, if any. | |
func withLatestFrom<Other: Publisher>(_ other: Other) | |
-> Publishers.WithLatestFrom<Self, Other, Other.Output> { | |
return .init(upstream: self, second: other) { $1 } | |
} | |
} | |
// MARK: - Publisher | |
extension Publishers { | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
public struct WithLatestFrom<Upstream: Publisher, | |
Other: Publisher, | |
Output>: Publisher where Upstream.Failure == Other.Failure { | |
public typealias Failure = Upstream.Failure | |
public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output | |
private let upstream: Upstream | |
private let second: Other | |
private let resultSelector: ResultSelector | |
private var latestValue: Other.Output? | |
init(upstream: Upstream, | |
second: Other, | |
resultSelector: @escaping ResultSelector) { | |
self.upstream = upstream | |
self.second = second | |
self.resultSelector = resultSelector | |
} | |
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input { | |
let sub = Subscription(upstream: upstream, | |
second: second, | |
resultSelector: resultSelector, | |
subscriber: subscriber) | |
subscriber.receive(subscription: sub) | |
} | |
} | |
} | |
// MARK: - Subscription | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
extension Publishers.WithLatestFrom { | |
private class Subscription<S: Subscriber>: Combine.Subscription where S.Input == Output, S.Failure == Failure { | |
private let subscriber: S | |
private let resultSelector: ResultSelector | |
private var latestValue: Other.Output? | |
private let upstream: Upstream | |
private let second: Other | |
private var firstSubscription: Cancellable? | |
private var secondSubscription: Cancellable? | |
init(upstream: Upstream, | |
second: Other, | |
resultSelector: @escaping ResultSelector, | |
subscriber: S) { | |
self.upstream = upstream | |
self.second = second | |
self.subscriber = subscriber | |
self.resultSelector = resultSelector | |
trackLatestFromSecond() | |
} | |
func request(_ demand: Subscribers.Demand) { | |
// withLatestFrom always takes one latest value from the second | |
// observable, so demand doesn't really have a meaning here. | |
firstSubscription = upstream | |
.sink( | |
receiveCompletion: { [subscriber] in subscriber.receive(completion: $0) }, | |
receiveValue: { [weak self] value in | |
guard let self = self else { return } | |
guard let latest = self.latestValue else { return } | |
_ = self.subscriber.receive(self.resultSelector(value, latest)) | |
}) | |
} | |
// Create an internal subscription to the `Other` publisher, | |
// constantly tracking its latest value | |
private func trackLatestFromSecond() { | |
let subscriber = AnySubscriber<Other.Output, Other.Failure>( | |
receiveSubscription: { [weak self] subscription in | |
self?.secondSubscription = subscription | |
subscription.request(.unlimited) | |
}, | |
receiveValue: { [weak self] value in | |
self?.latestValue = value | |
return .unlimited | |
}, | |
receiveCompletion: nil) | |
self.second.subscribe(subscriber) | |
} | |
func cancel() { | |
firstSubscription?.cancel() | |
secondSubscription?.cancel() | |
} | |
} | |
} |
It seems to me like withLatestFrom always takes the first value of the second subscription?
Given this example code:
let digits = [1, 2, 3]
.publisher
.delay(for: .seconds(1), scheduler: DispatchQueue.main)
.print("digits")
let letters = ["a", "b", "c"]
.publisher
.print("letters")
var values: [(Int, String)] = []
digits
.withLatestFrom(letters) { ($0, $1) }
.print("withLatestFrom")
.sink(
receiveCompletion: { _ in print("values:", values) },
receiveValue: { values.append(($0, $1)) }
)
.store(in: &cancellables)
We get this log output:
letters: receive subscription: (["a", "b", "c"])
letters: request max: (1)
letters: receive value: (a)
withLatestFrom: receive subscription: ((extension in Spielwiese_Sources):Combine.Publishers.WithLatestFrom<Combine.Publishers.Print<Combine.Publishers.Delay<Combine.Publishers.Sequence<Swift.Array<Swift.Int>, Swift.Never>, __C.OS_dispatch_queue>>, Combine.Publishers.Print<Combine.Publishers.Sequence<Swift.Array<Swift.String>, Swift.Never>>, (Swift.Int, Swift.String)>.(unknown context at $10dfb7b10).Subscription<Combine.Publishers.Print<(extension in Spielwiese_Sources):Combine.Publishers.WithLatestFrom<Combine.Publishers.Print<Combine.Publishers.Delay<Combine.Publishers.Sequence<Swift.Array<Swift.Int>, Swift.Never>, __C.OS_dispatch_queue>>, Combine.Publishers.Print<Combine.Publishers.Sequence<Swift.Array<Swift.String>, Swift.Never>>, (Swift.Int, Swift.String)>>.(unknown context at $7fff233d5eb0).Inner<Combine.Subscribers.Sink<(Swift.Int, Swift.String), Swift.Never>>>)
withLatestFrom: request unlimited
digits: receive subscription: (Combine.Publishers.Delay<Combine.Publishers.Sequence<Swift.Array<Swift.Int>, Swift.Never>, __C.OS_dispatch_queue>.(unknown context at $7fff233e1210).Inner<Combine.Publishers.Print<Combine.Publishers.Delay<Combine.Publishers.Sequence<Swift.Array<Swift.Int>, Swift.Never>, __C.OS_dispatch_queue>>.(unknown context at $7fff233d5eb0).Inner<Combine.Subscribers.Sink<Swift.Int, Swift.Never>>>)
digits: request unlimited
digits: receive value: (1)
letters: receive subscription: (["a", "b", "c"])
letters: receive cancel
letters: request max: (1)
letters: receive value: (a)
withLatestFrom: receive value: ((1, "a"))
digits: receive value: (2)
letters: receive subscription: (["a", "b", "c"])
letters: receive cancel
letters: request max: (1)
letters: receive value: (a)
withLatestFrom: receive value: ((2, "a"))
digits: receive value: (3)
letters: receive subscription: (["a", "b", "c"])
letters: receive cancel
letters: request max: (1)
letters: receive value: (a)
withLatestFrom: receive value: ((3, "a"))
digits: receive finished
withLatestFrom: receive finished
digits: receive cancel
values: [(1, "a"), (2, "a"), (3, "a")]
I'd expect values
to be [(1, "c"), (2, "c"), (3, "c")]
. I suspect the issue might be that the letters
subscription shouldn't get repeatedly canceled and re-subscribed to? Apologies in advance if this isn't an issue with withLatestFrom
but instead my limited understanding of how this should work. :)
Good catch - the cancellation is indeed the issue and also the lazy demand (e.g. max(1)
). I've made a few adjustments, let me know if that helps :) @msewell
@freak4pc That does seem to work better, yes! Nice work!
I believe this is one of the most important operators missing in Combine.
However, in the trackLatestFromSecond
private function, does it make a difference if we change the AnySubscriber
usage to second.sink(receiveCompletion:receiveValue:)
?
I'd suggest to improve this a bit with handling backpressure:
func request(_ demand: Subscribers.Demand) {
if demand == .unlimited {
firstSubscription = upstream
.sink(
receiveCompletion: { [subscriber] in subscriber.receive(completion: $0) },
receiveValue: { [weak self] value in
guard let self = self else { return }
guard let latest = self.latestValue else { return }
_ = self.subscriber.receive(self.resultSelector(value, latest))
})
} else if demand > .none {
let subscriber = AnySubscriber<Upstream.Output, Upstream.Failure>(receiveSubscription: { [weak self] subscription in
self?.firstSubscription = subscription
subscription.request(demand)
},
receiveValue: { [weak self] value in
guard let self = self else { return .max(1) }
guard let latest = self.latestValue else { return .max(1) }
return self.subscriber.receive(self.resultSelector(value, latest))
},
receiveCompletion: { [weak self] in self?.subscriber.receive(completion: $0) })
self.upstream.subscribe(subscriber)
}
}
Also trackLatestFromSecond could be simplified:
private func trackLatestFromSecond() {
self.secondSubscription = self.second
.sink(receiveCompletion: {_ in },
receiveValue: { [weak self] value in
self?.latestValue = value
})
}
Happy that found this. Really most missing operator.
What is the purpose of handling back pressure for an operator that can only handle/use .max(1) ? Seems pointless
The only thing you might want to support is .none
, perhaps
I'm new to Combine and could be mistaken but I think this is how Apple recommends doing.
Synchronise demand for upstream subscriber and our main subscriber is main idea here.
I don't see good examples over the internet how to do that correctly. Probably you are right that this handling is not correct or not really useful.
Also haven't found any operators for materialize/dematerialize. Will try to write them by myself. really miss good examples over the internet
Yeah there aren't a lot of good examples :(
Basically if your operator can produce a lot of elements, definitely you must support backpressure, but in this case you always want one and always a specific one (with latest), so back pressure support would just hinder the capabilities of this operator
This is great. Spent too many hours trying to figure the way around Combine to have this behavior. The question is can this gist be used? What license is it?
As far as I'm concerned it's MIT. Feel free to use it.
It will be part of CombineCommunity/CombineExt, soon
Got it, thanks.
Here's a version composed out of built-in operators instead of rebuilding the publisher:
import Combine
import Dispatch
extension Publishers {
struct WithLatestFrom<Upstream: Publisher, Side: Publisher, Output>: Publisher where Upstream.Failure == Side.Failure {
typealias Failure = Side.Failure
typealias ResultSelector = (Upstream.Output, Side.Output) -> Output
let upstream: Upstream
let side: Side
let resultSelector: ResultSelector
func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let timestampedUpstream = upstream.map(TimestampedValue.init)
let timestampedSide = side.map(TimestampedValue.init)
let state = timestampedUpstream.combineLatest(timestampedSide, State.init)
let mappedValues = state
.filter { $0.upstream.time >= $0.side.time }
.map { resultSelector($0.upstream.value, $0.side.value) }
mappedValues.receive(subscriber: subscriber)
}
private struct TimestampedValue<T> {
let value: T
let time: DispatchTime
init(value: T) {
self.value = value
self.time = DispatchTime.now()
}
}
private struct State {
let upstream: TimestampedValue<Upstream.Output>
let side: TimestampedValue<Side.Output>
}
}
}
extension Publisher {
func withLatestFrom<P: Publisher, R>(_ publisher: P,
resultSelector: @escaping (Output, P.Output) -> R) -> Publishers.WithLatestFrom<Self, P, R> {
Publishers.WithLatestFrom(upstream: self, side: publisher, resultSelector: resultSelector)
}
func withLatestFrom<P: Publisher>(_ publisher: P) -> Publishers.WithLatestFrom<Self, P, P.Output> {
withLatestFrom(publisher, resultSelector: { $1 })
}
}
Thanks @pawlowskialex.
If you're going down that path, here's a neatly composed variation @jasdev wrote:
extension Publisher {
func withLatestFrom<Other: Publisher, Result>(_ other: Other,
resultSelector: @escaping (Output, Other.Output) -> Result)
-> AnyPublisher<Result, Failure>
where Other.Failure == Failure {
let upstream = share()
return other
.map { second in upstream.map { resultSelector($0, second) } }
.switchToLatest()
.zip(upstream) // `zip`ping and discarding `\.1` allows for
// upstream completions to be projected down immediately.
.map(\.0)
.eraseToAnyPublisher()
}
}
And a blot post he wrote about this topic: https://jasdev.me/notes/with-latest-from
Example usage:
Prints out: