Skip to content

Instantly share code, notes, and snippets.

@freak4pc
Last active February 19, 2024 15:35
Show Gist options
  • Save freak4pc/8d46ea6a6f5e5902c3fb5eba440a55c3 to your computer and use it in GitHub Desktop.
Save freak4pc/8d46ea6a6f5e5902c3fb5eba440a55c3 to your computer and use it in GitHub Desktop.
withLatestFrom for Apple's Combine
//
// Combine+WithLatestFrom.swift
//
// Created by Shai Mishali on 29/08/2019.
// Copyright © 2019 Shai Mishali. All rights reserved.
//
import Combine
// MARK: - Operator methods
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {
/// Merges two publishers into a single publisher by combining each value
/// from self with the latest value from the second publisher, if any.
///
/// - parameter other: Second observable source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Result>(_ other: Other,
resultSelector: @escaping (Output, Other.Output) -> Result)
-> Publishers.WithLatestFrom<Self, Other, Result> {
return .init(upstream: self, second: other, resultSelector: resultSelector)
}
/// Upon an emission from self, emit the latest value from the
/// second publisher, if any exists.
///
/// - parameter other: Second observable source.
///
/// - returns: A publisher containing the latest value from the second publisher, if any.
func withLatestFrom<Other: Publisher>(_ other: Other)
-> Publishers.WithLatestFrom<Self, Other, Other.Output> {
return .init(upstream: self, second: other) { $1 }
}
}
// MARK: - Publisher
extension Publishers {
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct WithLatestFrom<Upstream: Publisher,
Other: Publisher,
Output>: Publisher where Upstream.Failure == Other.Failure {
public typealias Failure = Upstream.Failure
public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output
private let upstream: Upstream
private let second: Other
private let resultSelector: ResultSelector
private var latestValue: Other.Output?
init(upstream: Upstream,
second: Other,
resultSelector: @escaping ResultSelector) {
self.upstream = upstream
self.second = second
self.resultSelector = resultSelector
}
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let sub = Subscription(upstream: upstream,
second: second,
resultSelector: resultSelector,
subscriber: subscriber)
subscriber.receive(subscription: sub)
}
}
}
// MARK: - Subscription
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers.WithLatestFrom {
private class Subscription<S: Subscriber>: Combine.Subscription where S.Input == Output, S.Failure == Failure {
private let subscriber: S
private let resultSelector: ResultSelector
private var latestValue: Other.Output?
private let upstream: Upstream
private let second: Other
private var firstSubscription: Cancellable?
private var secondSubscription: Cancellable?
init(upstream: Upstream,
second: Other,
resultSelector: @escaping ResultSelector,
subscriber: S) {
self.upstream = upstream
self.second = second
self.subscriber = subscriber
self.resultSelector = resultSelector
trackLatestFromSecond()
}
func request(_ demand: Subscribers.Demand) {
// withLatestFrom always takes one latest value from the second
// observable, so demand doesn't really have a meaning here.
firstSubscription = upstream
.sink(
receiveCompletion: { [subscriber] in subscriber.receive(completion: $0) },
receiveValue: { [weak self] value in
guard let self = self else { return }
guard let latest = self.latestValue else { return }
_ = self.subscriber.receive(self.resultSelector(value, latest))
})
}
// Create an internal subscription to the `Other` publisher,
// constantly tracking its latest value
private func trackLatestFromSecond() {
let subscriber = AnySubscriber<Other.Output, Other.Failure>(
receiveSubscription: { [weak self] subscription in
self?.secondSubscription = subscription
subscription.request(.unlimited)
},
receiveValue: { [weak self] value in
self?.latestValue = value
return .unlimited
},
receiveCompletion: nil)
self.second.subscribe(subscriber)
}
func cancel() {
firstSubscription?.cancel()
secondSubscription?.cancel()
}
}
}
@serbats
Copy link

serbats commented Feb 12, 2020

I'm new to Combine and could be mistaken but I think this is how Apple recommends doing.
Synchronise demand for upstream subscriber and our main subscriber is main idea here.

I don't see good examples over the internet how to do that correctly. Probably you are right that this handling is not correct or not really useful.
Also haven't found any operators for materialize/dematerialize. Will try to write them by myself. really miss good examples over the internet

@freak4pc
Copy link
Author

Yeah there aren't a lot of good examples :(
Basically if your operator can produce a lot of elements, definitely you must support backpressure, but in this case you always want one and always a specific one (with latest), so back pressure support would just hinder the capabilities of this operator

@keylook
Copy link

keylook commented Feb 27, 2020

This is great. Spent too many hours trying to figure the way around Combine to have this behavior. The question is can this gist be used? What license is it?

@freak4pc
Copy link
Author

As far as I'm concerned it's MIT. Feel free to use it.
It will be part of CombineCommunity/CombineExt, soon

@keylook
Copy link

keylook commented Feb 27, 2020

Got it, thanks.

@pawlowskialex
Copy link

Here's a version composed out of built-in operators instead of rebuilding the publisher:

import Combine
import Dispatch

extension Publishers {
    struct WithLatestFrom<Upstream: Publisher, Side: Publisher, Output>: Publisher where Upstream.Failure == Side.Failure {
        typealias Failure = Side.Failure
        typealias ResultSelector = (Upstream.Output, Side.Output) -> Output

        let upstream: Upstream
        let side: Side
        let resultSelector: ResultSelector
        
        func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
            let timestampedUpstream = upstream.map(TimestampedValue.init)
            let timestampedSide = side.map(TimestampedValue.init)
            let state = timestampedUpstream.combineLatest(timestampedSide, State.init)
            let mappedValues = state
                .filter { $0.upstream.time >= $0.side.time }
                .map { resultSelector($0.upstream.value, $0.side.value) }
            
            mappedValues.receive(subscriber: subscriber)
        }
        
        private struct TimestampedValue<T> {
            let value: T
            let time: DispatchTime
            
            init(value: T) {
                self.value = value
                self.time = DispatchTime.now()
            }
        }
        
        private struct State {
            let upstream: TimestampedValue<Upstream.Output>
            let side: TimestampedValue<Side.Output>
        }
    }
}

extension Publisher {
    func withLatestFrom<P: Publisher, R>(_ publisher: P,
                                         resultSelector: @escaping (Output, P.Output) -> R) -> Publishers.WithLatestFrom<Self, P, R> {
        Publishers.WithLatestFrom(upstream: self, side: publisher, resultSelector: resultSelector)
    }

    func withLatestFrom<P: Publisher>(_ publisher: P) -> Publishers.WithLatestFrom<Self, P, P.Output> {
        withLatestFrom(publisher, resultSelector: { $1 })
    }
}

@freak4pc
Copy link
Author

freak4pc commented Aug 6, 2021

Thanks @pawlowskialex.
If you're going down that path, here's a neatly composed variation @jasdev wrote:

extension Publisher {
  func withLatestFrom<Other: Publisher, Result>(_ other: Other,
                                                    resultSelector: @escaping (Output, Other.Output) -> Result)
      -> AnyPublisher<Result, Failure>
      where Other.Failure == Failure {
          let upstream = share()
  
          return other
              .map { second in upstream.map { resultSelector($0, second) } }
              .switchToLatest()
              .zip(upstream) // `zip`ping and discarding `\.1` allows for
                                        // upstream completions to be projected down immediately.
              .map(\.0)
              .eraseToAnyPublisher()
      }
}

And a blot post he wrote about this topic: https://jasdev.me/notes/with-latest-from

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment