Last active
May 3, 2021 23:12
-
-
Save swhitty/03a50ba8201a1b3d46400e5b6cb54516 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
public extension Publisher { | |
/// Converts a publisher's output into the most recent output of another publisher. | |
/// | |
/// - parameter other: A publisher that will provide the latest value | |
/// - returns: A publisher that emits the latest ouput of another publisher | |
func withLatestFrom<Other: Publisher>(_ other: Other) -> WithLatestFrom<Self, Other, Other.Output> { | |
WithLatestFrom(upstream: self, other: other, transform: { _, other in other }) | |
} | |
/// Merges a publisher's output with the most recent output of another publisher using a transformer. | |
/// | |
/// - parameter other: A publisher that will provide the latest value | |
/// - parameter transform: A closure that transforms the output of both publishers | |
/// - returns: A publisher that emits the transformed ouput | |
func withLatestFrom<Other: Publisher, T>(_ other: Other, | |
transform: @escaping (Output, Other.Output) -> T) -> WithLatestFrom<Self, Other, T> { | |
WithLatestFrom(upstream: self, other: other, transform: transform) | |
} | |
} | |
public struct WithLatestFrom<Upstream: Publisher, Other: Publisher, Output>: Publisher { | |
public typealias Failure = Upstream.Failure | |
public typealias Transformer = (Upstream.Output, Other.Output) -> Output | |
private let upstream: Upstream | |
private let other: Other | |
private let transform: Transformer | |
public init(upstream: Upstream, other: Other, transform: @escaping Transformer) { | |
self.upstream = upstream | |
self.other = other | |
self.transform = transform | |
} | |
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input { | |
let sub = Subscription( | |
subscriber: subscriber, | |
upstream: upstream, | |
other: other, | |
transform: transform | |
) | |
subscriber.receive(subscription: sub) | |
} | |
} | |
private extension WithLatestFrom { | |
final class Subscription<S: Subscriber>: Combine.Subscription where Upstream.Failure == S.Failure, Output == S.Input { | |
private let upstream: Upstream | |
private let subscriber: S | |
private let transform: Transformer | |
private var latest: Other.Output? | |
private var latestCancellable: AnyCancellable? | |
private var upstreamCancellable: AnyCancellable? | |
init(subscriber: S, upstream: Upstream, other: Other, transform: @escaping Transformer) { | |
self.upstream = upstream | |
self.subscriber = subscriber | |
self.transform = transform | |
self.latestCancellable = other.sink(receiveCompletion: { _ in }) { [weak self] value in | |
self?.latest = value | |
} | |
} | |
func request(_ demand: Subscribers.Demand) { | |
upstreamCancellable = upstream.sink( | |
receiveCompletion: { [subscriber] in subscriber.receive(completion: $0) }, | |
receiveValue: { [weak self, subscriber, transform] value in | |
guard let latest = self?.latest else { return } | |
_ = subscriber.receive(transform(value, latest)) | |
} | |
) | |
} | |
func cancel() { | |
latestCancellable?.cancel() | |
upstreamCancellable?.cancel() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment