Last active
September 13, 2020 20:52
-
-
Save serbats/e7effd6c96a56e2377f36618061088ef to your computer and use it in GitHub Desktop.
Missing Apple Combine Operators: withLatestFrom, materialize, dematerialize
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Combine | |
// Base Abstraction Classes | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
protocol DownStreamHelperProtocol { | |
/// Helper for handling backpressure | |
/// DownstreamSubscription class use it as delegate for backpressure handling | |
associatedtype Input | |
associatedtype Failure: Error | |
func request(_ demand: Subscribers.Demand) -> Subscribers.Demand | |
func downstream(_ value: Input) -> Subscribers.Demand | |
func downstream(_ completion: Subscribers.Completion<Failure>) | |
func cancel() | |
} | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
final class DownstreamSubscription<Upstream: Publisher, Helper: DownStreamHelperProtocol>: Subscription, Subscriber where Upstream.Output == Helper.Input, Upstream.Failure == Helper.Failure { | |
/// Serves as upstream subscriber and keeps it subscription. | |
/// Also is a subscription for main operator publisher | |
/// Delegates everything to helper for correct handling | |
public typealias Failure = Upstream.Failure | |
public typealias Input = Upstream.Output | |
private let upstream: Upstream | |
private let downStream: Helper | |
private var upstreamSubscription: Subscription? | |
private var mutex = pthread_mutex_t() | |
init(upstream: Upstream, downStream: Helper) { | |
pthread_mutex_init(&mutex, nil) | |
self.upstream = upstream | |
self.downStream = downStream | |
} | |
func request(_ demand: Subscribers.Demand) { | |
guard demand > .none else { return } | |
if let upstreamSubscription = self.upstreamSubscription { | |
let demandModified = downStream.request(demand) | |
if demandModified > .none { | |
upstreamSubscription.request(demandModified) | |
} | |
} else { | |
// https://en.wikipedia.org/wiki/Double-checked_locking | |
pthread_mutex_lock(&mutex) | |
if upstreamSubscription == nil { | |
upstream.subscribe(self) // this should call receive(subscription: Subscription) | |
//upstreamSubscription should be not nil starting from here | |
} | |
pthread_mutex_unlock(&mutex) | |
let demandModified = downStream.request(demand) | |
if demandModified > .none { | |
upstreamSubscription?.request(demandModified) | |
} | |
} | |
} | |
func cancel() { | |
downStream.cancel() | |
upstreamSubscription?.cancel() | |
upstreamSubscription = nil | |
} | |
func receive(_ input: Input) -> Subscribers.Demand { | |
return self.downStream.downstream(input) | |
} | |
func receive(subscription: Subscription) { | |
self.upstreamSubscription = subscription | |
} | |
func receive(completion: Subscribers.Completion<Failure>) { | |
self.downStream.downstream(completion) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
// Mark: - Event enum | |
public enum Event<Value, Failure: Error> { | |
case value(Value) | |
case completion(Subscribers.Completion<Failure>) | |
func error() -> Failure? { | |
switch self { | |
case .value(_): | |
return nil | |
case .completion(let compl): | |
switch compl { | |
case .finished: | |
return nil | |
case .failure(let error): | |
return error | |
} | |
} | |
} | |
} | |
// MARK: - Operator methods | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
extension Publisher { | |
/// Convert any Publisher into an Publisher of its Events. | |
/// | |
/// - parameter forceRecieveCompletion: If Event.completion should be delivered | |
/// when subscriber asks for .none elements | |
/// - returns: A publisher of Events | |
func materialize(forceRecieveCompletion: Bool = true) | |
-> Publishers.Materialize<Self> { | |
return .init(upstream: self, forceRecieveCompletion: forceRecieveCompletion) | |
} | |
/// Convert Publisher of Events into Publisher. | |
/// | |
/// - returns: A publisher from events | |
func dematerialize<Value, Failure: Error>() | |
-> Publishers.Dematerialize<Self, Value, Failure> where Self.Output == Event<Value, Failure>, Self.Failure == Never { | |
return .init(upstream: self) | |
} | |
} | |
// MARK: - Publisher | |
extension Publishers { | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
public struct Materialize<Upstream: Publisher>: Publisher { | |
public typealias Output = Event<Upstream.Output, Upstream.Failure> | |
public typealias Failure = Never | |
private let forceRecieveCompletion: Bool | |
private let upstream: Upstream | |
init(upstream: Upstream, forceRecieveCompletion: Bool) { | |
self.upstream = upstream | |
self.forceRecieveCompletion = forceRecieveCompletion | |
} | |
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input { | |
var sub: Subscription! | |
if forceRecieveCompletion { | |
let helper = DownStreamHelperForceReceive(subscriber: subscriber) | |
sub = DownstreamSubscription(upstream: upstream, downStream: helper) | |
} else { | |
let helper = DownStreamHelperWaitReceive(subscriber: subscriber) | |
sub = DownstreamSubscription(upstream: upstream, downStream: helper) | |
} | |
subscriber.receive(subscription: sub) | |
} | |
} | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
public struct Dematerialize<Upstream: Publisher, Value, Failure: Error>: Publisher where Upstream.Output == Event<Value, Failure>, Upstream.Failure == Never { | |
public typealias Output = Value | |
private let upstream: Upstream | |
init(upstream: Upstream) { | |
self.upstream = upstream | |
} | |
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input { | |
let helper = DownStreamHelper<S>(subscriber: subscriber) | |
let sub = DownstreamSubscription(upstream: upstream, downStream: helper) | |
subscriber.receive(subscription: sub) | |
} | |
} | |
} | |
// MARK: - Subscription | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
extension Publishers.Materialize { | |
private final class DownStreamHelperForceReceive<S: Subscriber>: DownStreamHelperProtocol where S.Input == Event<Upstream.Output, Upstream.Failure>, S.Failure == Never { | |
typealias Input = Upstream.Output | |
typealias Failure = Upstream.Failure | |
private var subscriber: S? | |
init(subscriber: S) { | |
self.subscriber = subscriber | |
} | |
func request(_ demand: Subscribers.Demand) -> Subscribers.Demand { | |
return demand | |
} | |
func downstream(_ value: Input) -> Subscribers.Demand { | |
return self.subscriber?.receive(Event.value(value)) ?? .none | |
} | |
func downstream(_ completion: Subscribers.Completion<Failure>) { | |
_ = self.subscriber?.receive(Event.completion(completion)) | |
self.subscriber?.receive(completion: .finished) | |
self.subscriber = nil | |
} | |
func cancel() { | |
subscriber = nil | |
} | |
} | |
private final class DownStreamHelperWaitReceive<S: Subscriber>: DownStreamHelperProtocol where S.Input == Event<Upstream.Output, Upstream.Failure>, S.Failure == Never { | |
typealias Input = Upstream.Output | |
typealias Failure = Upstream.Failure | |
private var subscriber: S? | |
private var demand: Subscribers.Demand = .none | |
private var completion: Subscribers.Completion<Upstream.Failure>? | |
private var mutex = pthread_mutex_t() | |
init(subscriber: S) { | |
pthread_mutex_init(&mutex, nil) | |
self.subscriber = subscriber | |
} | |
func request(_ demand: Subscribers.Demand) -> Subscribers.Demand { | |
pthread_mutex_lock(&mutex) | |
if let completion = self.completion { | |
_ = self.subscriber?.receive(Event.completion(completion)) | |
self.subscriber?.receive(completion: .finished) | |
self.subscriber = nil | |
self.demand = .none | |
pthread_mutex_unlock(&mutex) | |
return .none | |
} | |
self.demand += demand | |
pthread_mutex_unlock(&mutex) | |
return demand | |
} | |
func downstream(_ value: Input) -> Subscribers.Demand { | |
pthread_mutex_lock(&mutex) | |
guard let subscriber = self.subscriber, | |
self.demand > .none else { | |
pthread_mutex_unlock(&mutex) | |
return .none | |
} | |
let adjust = subscriber.receive(Event.value(value)) | |
self.demand = self.demand - 1 + adjust | |
pthread_mutex_unlock(&mutex) | |
return adjust | |
} | |
func downstream(_ completion: Subscribers.Completion<Failure>) { | |
pthread_mutex_lock(&mutex) | |
if self.demand > .none { | |
_ = self.subscriber?.receive(Event.completion(completion)) | |
self.subscriber?.receive(completion: .finished) | |
self.subscriber = nil | |
} else { | |
self.completion = completion | |
} | |
pthread_mutex_unlock(&mutex) | |
} | |
func cancel() { | |
subscriber = nil | |
} | |
} | |
} | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
extension Publishers.Dematerialize { | |
private final class DownStreamHelper<S: Subscriber>: DownStreamHelperProtocol where S.Input == Output, S.Failure == Failure { | |
typealias Input = Upstream.Output | |
typealias Failure = Upstream.Failure | |
private var subscriber: S? | |
init(subscriber: S) { | |
self.subscriber = subscriber | |
} | |
func request(_ demand: Subscribers.Demand) -> Subscribers.Demand { | |
return demand | |
} | |
func downstream(_ event: Input) -> Subscribers.Demand { | |
switch event { | |
case let .value(inputValue): | |
return self.subscriber?.receive(inputValue) ?? .none | |
case let .completion(inputCompletetion): | |
self.subscriber?.receive(completion: inputCompletetion) | |
self.subscriber = nil | |
return .none | |
} | |
} | |
func downstream(_ completion: Subscribers.Completion<Failure>) { | |
self.subscriber?.receive(completion: .finished) | |
self.subscriber = nil | |
} | |
func cancel() { | |
subscriber = nil | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
// MARK: - Operator methods | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
extension Publisher { | |
/// Merges two publishers into a single publisher by combining each value | |
/// from self with the latest value from the second publisher, if any. | |
/// | |
/// - parameter other: Second observable source. | |
/// - parameter resultSelector: Function to invoke for each value from the self combined | |
/// with the latest value from the second source, if any. | |
/// | |
/// - returns: A publisher containing the result of combining each value of the self | |
/// with the latest value from the second publisher, if any, using the | |
/// specified result selector function. | |
func withLatestFrom<Other: Publisher, Result>(_ other: Other, | |
resultSelector: @escaping (Output, Other.Output) -> Result) | |
-> Publishers.WithLatestFrom<Self, Other, Result> { | |
return .init(upstream: self, other: other, resultSelector: resultSelector) | |
} | |
/// Upon an emission from self, emit the latest value from the | |
/// second publisher, if any exists. | |
/// | |
/// - parameter other: Second observable source. | |
/// | |
/// - returns: A publisher containing the latest value from the second publisher, if any. | |
func withLatestFrom<Other: Publisher>(_ other: Other) | |
-> Publishers.WithLatestFrom<Self, Other, Other.Output> { | |
return .init(upstream: self, other: other) { $1 } | |
} | |
} | |
// MARK: - Publisher | |
extension Publishers { | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
public struct WithLatestFrom<Upstream: Publisher, | |
Other: Publisher, | |
Output>: Publisher where Upstream.Failure == Other.Failure { | |
public typealias Failure = Upstream.Failure | |
public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output | |
private let upstream: Upstream | |
private let other: Other | |
private let resultSelector: ResultSelector | |
private var latestValue: Other.Output? | |
init(upstream: Upstream, | |
other: Other, | |
resultSelector: @escaping ResultSelector) { | |
self.upstream = upstream | |
self.other = other | |
self.resultSelector = resultSelector | |
} | |
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input { | |
let helper = DownStreamHelper(subscriber: subscriber, | |
other: other, | |
resultSelector: resultSelector) | |
let sub = DownstreamSubscription(upstream: upstream, downStream: helper) | |
subscriber.receive(subscription: sub) | |
} | |
} | |
} | |
// MARK: - Subscription | |
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) | |
extension Publishers.WithLatestFrom { | |
private final class DownStreamHelper<S: Subscriber>: DownStreamHelperProtocol where S.Input == Output, S.Failure == Upstream.Failure { | |
typealias Input = Upstream.Output | |
typealias Failure = Upstream.Failure | |
private var subscriber: S? | |
private var resultSelector: ResultSelector? | |
private var latestValue: Other.Output? | |
private var otherCancelable: Cancellable? | |
init(subscriber: S, | |
other: Other, | |
resultSelector: @escaping ResultSelector) { | |
self.subscriber = subscriber | |
self.resultSelector = resultSelector | |
self.otherCancelable = other | |
.sink(receiveCompletion: {_ in }, | |
receiveValue: { [weak self] value in | |
self?.latestValue = value | |
}) | |
} | |
func request(_ demand: Subscribers.Demand) -> Subscribers.Demand { | |
return demand | |
} | |
func downstream(_ value: Input) -> Subscribers.Demand { | |
guard let resultSelector = self.resultSelector, | |
let subscriber = self.subscriber else { return .none } | |
guard let latest = self.latestValue else { return .max(1) } | |
return subscriber.receive(resultSelector(value, latest)) | |
} | |
func downstream(_ completion: Subscribers.Completion<Failure>) { | |
self.subscriber?.receive(completion: completion) | |
self.subscriber = nil | |
} | |
func cancel() { | |
subscriber = nil | |
resultSelector = nil | |
latestValue = nil | |
otherCancelable?.cancel() | |
otherCancelable = nil | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
extension Publisher { | |
func trackActivity(with indicator: CurrentValueSubject<Bool, Never>?) -> Publishers.HandleEvents<Self> { | |
return self.handleEvents(receiveSubscription: { _ in | |
indicator?.send(true) | |
}, receiveCompletion: { _ in | |
indicator?.send(false) | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example 1 withLatestFrom:
Gives:
Example 2 withLatestFrom:
Gives:
Example 3 materialize:
Gives:
Example 4 materialize(forceRecieveCompletion: false):
Gives:
Example 5 dematerialize:
Gives: