Skip to content

Instantly share code, notes, and snippets.

@dsxsxsxs
Last active September 18, 2020 03:45
Show Gist options
  • Save dsxsxsxs/79df8df475d5bcd4239b723e2963cda6 to your computer and use it in GitHub Desktop.
Save dsxsxsxs/79df8df475d5bcd4239b723e2963cda6 to your computer and use it in GitHub Desktop.
A Combine implementation that behaves just like RxSwift's FlatMapFirst
//
// StickToFirst.swift
//
// Created by dsxsxsxs on 2020/09/16.
//
import Foundation
import Combine
public struct StickToFirst<P, Upstream>: Publisher where P: Publisher, P == Upstream.Output, Upstream: Publisher, P.Failure == Upstream.Failure {
public typealias Output = P.Output
public typealias Failure = P.Failure
public let upstream: Upstream
public init(upstream: Upstream) {
self.upstream = upstream
}
public func receive<S>(subscriber: S) where S: Subscriber, Output == S.Input, Failure == S.Failure {
subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber))
}
}
extension StickToFirst {
private final class Subscription<Downstream: Subscriber>: Combine.Subscription where Downstream.Input == P.Output,
Downstream.Failure == P.Failure {
private var sink: Sink<Downstream>!
private var upstream: Upstream!
private var downstream: Downstream!
init(upstream: Upstream, downstream: Downstream) {
self.upstream = upstream
self.downstream = downstream
}
func request(_ demand: Subscribers.Demand) {
sink = .init(downstream: downstream, downstreamDemand: demand, receiveCompletion: { [weak self] in
switch $0 {
case .finished: ()
case .failure:
self?.downstream.receive(completion: $0)
}
self?.dispose()
})
upstream.subscribe(sink)
}
private func dispose() {
sink = nil
upstream = nil
downstream = nil
}
func cancel() {
dispose()
}
}
private final class Sink<Downstream: Subscriber>: Subscriber where P: Publisher, Downstream.Failure == Upstream.Failure, P.Failure == Upstream.Failure, P.Output == Downstream.Input {
typealias Input = P
typealias Failure = Downstream.Failure
private let lock = NSRecursiveLock()
private let downstream: Downstream
private let receiveCompletion: (Subscribers.Completion<Failure>) -> Void
private var activeInputSink: InnerSink<P, Downstream>?
private var downstreamDemand: Subscribers.Demand
init(downstream: Downstream, downstreamDemand: Subscribers.Demand, receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void) {
self.downstream = downstream
self.downstreamDemand = downstreamDemand
self.receiveCompletion = receiveCompletion
}
func receive(subscription: Combine.Subscription) {
subscription.request(.unlimited)
}
func receive(_ input: Input) -> Subscribers.Demand {
lock.lock()
if activeInputSink == nil {
activate(input: input)
}
lock.unlock()
return .unlimited
}
private func activate(input: Input) {
lock.lock()
let lock = self.lock
activeInputSink = .init(downstream: downstream, downstreamDemand: downstreamDemand, receiveCompletion: { [weak self] completion, restOfDemands in
lock.lock()
guard let strongSelf = self else {
lock.unlock()
return
}
strongSelf.activeInputSink = nil
strongSelf.downstreamDemand = restOfDemands
switch completion {
case .finished: ()
case .failure:
strongSelf.receive(completion: completion)
}
lock.unlock()
})
input.subscribe(activeInputSink!)
lock.unlock()
}
func receive(completion: Subscribers.Completion<Failure>) {
lock.lock()
activeInputSink = nil
self.receiveCompletion(completion)
lock.unlock()
}
}
private final class InnerSink<Upstream: Publisher, Downstream: Subscriber>: Subscriber where Downstream.Failure == Upstream.Failure, Upstream.Output == Downstream.Input {
typealias Input = Upstream.Output
typealias Failure = Downstream.Failure
private let lock = NSLock()
private let downstream: Downstream
private let receiveCompletion: (Subscribers.Completion<Failure>, Subscribers.Demand) ->Void
private var downstreamDemand: Subscribers.Demand
init(downstream: Downstream, downstreamDemand: Subscribers.Demand, receiveCompletion: @escaping (Subscribers.Completion<Failure>, Subscribers.Demand) ->Void) {
self.downstreamDemand = downstreamDemand
self.downstream = downstream
self.receiveCompletion = receiveCompletion
}
func receive(subscription: Combine.Subscription) {
subscription.request(downstreamDemand)
}
func receive(_ input: Input) -> Subscribers.Demand {
lock.lock()
downstreamDemand -= 1
let newDemand = downstream.receive(input)
if newDemand > 0 {
downstreamDemand += newDemand
}
lock.unlock()
return newDemand
}
func receive(completion: Subscribers.Completion<Failure>) {
lock.lock()
let receiveCompletion = self.receiveCompletion
let demand = downstreamDemand
lock.unlock()
receiveCompletion(completion, demand)
}
}
}
extension Publisher where Self.Failure == Self.Output.Failure, Self.Output: Publisher {
public func stickToFirst() -> StickToFirst<Self.Output, Self> {
.init(upstream: self)
}
}
extension Publisher {
func flatMapFirst<T: Publisher>(_ transform: @escaping (Output) -> T) -> StickToFirst<T, Publishers.Map<Self, T>> where T.Failure == Failure {
map(transform).stickToFirst()
}
}
@dsxsxsxs
Copy link
Author

dsxsxsxs commented Sep 16, 2020

How to use

Just like switchToLatest()

publisher.map { somePublisher() }.stickToFirst()

I'm not sure whether cancel() works properly or not.

You may have your own flatMapFirstwith these codes below.

extension Publisher {
    func flatMapFirst<T: Publisher>(_ transform: @escaping (Output) -> T) -> StickToFirst<T, Publishers.Map<Self, T>> where T.Failure == Failure {
        map(transform).stickToFirst()
    }
}

Test

https://gist.github.com/dsxsxsxs/24e87d73a23b037cc2ebb1ed663d89d6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment