Last active
September 18, 2020 03:45
-
-
Save dsxsxsxs/79df8df475d5bcd4239b723e2963cda6 to your computer and use it in GitHub Desktop.
A Combine implementation that behaves just like RxSwift's FlatMapFirst
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// StickToFirst.swift | |
// | |
// Created by dsxsxsxs on 2020/09/16. | |
// | |
import Foundation | |
import Combine | |
public struct StickToFirst<P, Upstream>: Publisher where P: Publisher, P == Upstream.Output, Upstream: Publisher, P.Failure == Upstream.Failure { | |
public typealias Output = P.Output | |
public typealias Failure = P.Failure | |
public let upstream: Upstream | |
public init(upstream: Upstream) { | |
self.upstream = upstream | |
} | |
public func receive<S>(subscriber: S) where S: Subscriber, Output == S.Input, Failure == S.Failure { | |
subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber)) | |
} | |
} | |
extension StickToFirst { | |
private final class Subscription<Downstream: Subscriber>: Combine.Subscription where Downstream.Input == P.Output, | |
Downstream.Failure == P.Failure { | |
private var sink: Sink<Downstream>! | |
private var upstream: Upstream! | |
private var downstream: Downstream! | |
init(upstream: Upstream, downstream: Downstream) { | |
self.upstream = upstream | |
self.downstream = downstream | |
} | |
func request(_ demand: Subscribers.Demand) { | |
sink = .init(downstream: downstream, downstreamDemand: demand, receiveCompletion: { [weak self] in | |
switch $0 { | |
case .finished: () | |
case .failure: | |
self?.downstream.receive(completion: $0) | |
} | |
self?.dispose() | |
}) | |
upstream.subscribe(sink) | |
} | |
private func dispose() { | |
sink = nil | |
upstream = nil | |
downstream = nil | |
} | |
func cancel() { | |
dispose() | |
} | |
} | |
private final class Sink<Downstream: Subscriber>: Subscriber where P: Publisher, Downstream.Failure == Upstream.Failure, P.Failure == Upstream.Failure, P.Output == Downstream.Input { | |
typealias Input = P | |
typealias Failure = Downstream.Failure | |
private let lock = NSRecursiveLock() | |
private let downstream: Downstream | |
private let receiveCompletion: (Subscribers.Completion<Failure>) -> Void | |
private var activeInputSink: InnerSink<P, Downstream>? | |
private var downstreamDemand: Subscribers.Demand | |
init(downstream: Downstream, downstreamDemand: Subscribers.Demand, receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void) { | |
self.downstream = downstream | |
self.downstreamDemand = downstreamDemand | |
self.receiveCompletion = receiveCompletion | |
} | |
func receive(subscription: Combine.Subscription) { | |
subscription.request(.unlimited) | |
} | |
func receive(_ input: Input) -> Subscribers.Demand { | |
lock.lock() | |
if activeInputSink == nil { | |
activate(input: input) | |
} | |
lock.unlock() | |
return .unlimited | |
} | |
private func activate(input: Input) { | |
lock.lock() | |
let lock = self.lock | |
activeInputSink = .init(downstream: downstream, downstreamDemand: downstreamDemand, receiveCompletion: { [weak self] completion, restOfDemands in | |
lock.lock() | |
guard let strongSelf = self else { | |
lock.unlock() | |
return | |
} | |
strongSelf.activeInputSink = nil | |
strongSelf.downstreamDemand = restOfDemands | |
switch completion { | |
case .finished: () | |
case .failure: | |
strongSelf.receive(completion: completion) | |
} | |
lock.unlock() | |
}) | |
input.subscribe(activeInputSink!) | |
lock.unlock() | |
} | |
func receive(completion: Subscribers.Completion<Failure>) { | |
lock.lock() | |
activeInputSink = nil | |
self.receiveCompletion(completion) | |
lock.unlock() | |
} | |
} | |
private final class InnerSink<Upstream: Publisher, Downstream: Subscriber>: Subscriber where Downstream.Failure == Upstream.Failure, Upstream.Output == Downstream.Input { | |
typealias Input = Upstream.Output | |
typealias Failure = Downstream.Failure | |
private let lock = NSLock() | |
private let downstream: Downstream | |
private let receiveCompletion: (Subscribers.Completion<Failure>, Subscribers.Demand) ->Void | |
private var downstreamDemand: Subscribers.Demand | |
init(downstream: Downstream, downstreamDemand: Subscribers.Demand, receiveCompletion: @escaping (Subscribers.Completion<Failure>, Subscribers.Demand) ->Void) { | |
self.downstreamDemand = downstreamDemand | |
self.downstream = downstream | |
self.receiveCompletion = receiveCompletion | |
} | |
func receive(subscription: Combine.Subscription) { | |
subscription.request(downstreamDemand) | |
} | |
func receive(_ input: Input) -> Subscribers.Demand { | |
lock.lock() | |
downstreamDemand -= 1 | |
let newDemand = downstream.receive(input) | |
if newDemand > 0 { | |
downstreamDemand += newDemand | |
} | |
lock.unlock() | |
return newDemand | |
} | |
func receive(completion: Subscribers.Completion<Failure>) { | |
lock.lock() | |
let receiveCompletion = self.receiveCompletion | |
let demand = downstreamDemand | |
lock.unlock() | |
receiveCompletion(completion, demand) | |
} | |
} | |
} | |
extension Publisher where Self.Failure == Self.Output.Failure, Self.Output: Publisher { | |
public func stickToFirst() -> StickToFirst<Self.Output, Self> { | |
.init(upstream: self) | |
} | |
} | |
extension Publisher { | |
func flatMapFirst<T: Publisher>(_ transform: @escaping (Output) -> T) -> StickToFirst<T, Publishers.Map<Self, T>> where T.Failure == Failure { | |
map(transform).stickToFirst() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How to use
Just like
switchToLatest()
I'm not sure whether
cancel()
works properly or not.You may have your own
flatMapFirst
with these codes below.Test
https://gist.github.com/dsxsxsxs/24e87d73a23b037cc2ebb1ed663d89d6