Last active
July 23, 2021 23:47
-
-
Save swhitty/efe6393ff5a13bee3654be9227db4168 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Combine.Publishers.ReceiveOn is seriously broken on iOS 13.0–2 | |
// https://forums.swift.org/t/combine-receive-on-runloop-main-loses-sent-value-how-can-i-make-it-work/28631 | |
// Provides fix by using OpenCombine version on iOS 13.0–2 and Combine version for iOS 13.3+ | |
import Combine | |
import Foundation | |
@available(iOS, deprecated: 13.3, message: "use receive(on:)") | |
public extension Publisher { | |
/// Returns a publisher that simply calls `receive(on:)` on iOS 13.3 and later | |
/// Earlier versions used a fixed publisher from the OpenCombine project. | |
func receiveFix<Context: Scheduler>( | |
on scheduler: Context, | |
options: Context.SchedulerOptions? = nil | |
) -> ReceiveOnFix<Self, Context> { | |
return .init(upstream: self, scheduler: scheduler, options: options) | |
} | |
} | |
/// Publisher that switches between the Combine `ReceiveOn` publisher and a fixed version depending on OS version. | |
/// `Publishers.ReceiveOn` includes recursive lock bugs on iOS 13.0–2. | |
@available(iOS, deprecated: 13.3, message: "use Publishers.ReceiveOn") | |
public struct ReceiveOnFix<Upstream: Publisher, Context: Scheduler>: Publisher { | |
public typealias Output = Upstream.Output | |
public typealias Failure = Upstream.Failure | |
private let upstream: Upstream | |
private let scheduler: Context | |
private let options: Context.SchedulerOptions? | |
public init(upstream: Upstream, scheduler: Context, options: Context.SchedulerOptions?) { | |
self.upstream = upstream | |
self.scheduler = scheduler | |
self.options = options | |
} | |
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input { | |
if #available(iOS 13.3, *) { | |
let publisher = Publishers.ReceiveOn(upstream: upstream, | |
scheduler: scheduler, | |
options: options) | |
publisher.receive(subscriber: subscriber) | |
} else { | |
let inner = Inner(scheduler: scheduler, | |
options: options, | |
downstream: subscriber) | |
upstream.subscribe(inner) | |
} | |
} | |
} | |
@available(iOS, deprecated: 13.3, message: "use Publishers.ReceiveOn") | |
private extension ReceiveOnFix { | |
/// Taken from OpenCombine | |
/// https://github.com/OpenCombine/OpenCombine/blob/0.12.0/Sources/OpenCombine/Publishers/Publishers.ReceiveOn.swift | |
final class Inner<Downstream: Subscriber> | |
: Subscriber, | |
Subscription, | |
CustomStringConvertible, | |
CustomReflectable, | |
CustomPlaygroundDisplayConvertible | |
where Downstream.Input == Upstream.Output, Downstream.Failure == Upstream.Failure | |
{ | |
typealias Input = Upstream.Output | |
typealias Failure = Upstream.Failure | |
private let lock = NSLock() | |
private let downstream: Downstream | |
private let scheduler: Context | |
private let options: Context.SchedulerOptions? | |
private var state = SubscriptionStatus.awaitingSubscription | |
private let downstreamLock = NSRecursiveLock() | |
init(scheduler: Context, | |
options: Context.SchedulerOptions?, | |
downstream: Downstream) { | |
self.downstream = downstream | |
self.scheduler = scheduler | |
self.options = options | |
} | |
func receive(subscription: Subscription) { | |
lock.lock() | |
guard case .awaitingSubscription = state else { | |
lock.unlock() | |
subscription.cancel() | |
return | |
} | |
state = .subscribed(subscription) | |
lock.unlock() | |
downstreamLock.lock() | |
downstream.receive(subscription: self) | |
downstreamLock.unlock() | |
} | |
func receive(_ input: Input) -> Subscribers.Demand { | |
lock.lock() | |
guard case .subscribed = state else { | |
lock.unlock() | |
return .none | |
} | |
lock.unlock() | |
scheduler.schedule(options: options) { | |
self.scheduledReceive(input) | |
} | |
return .none | |
} | |
private func scheduledReceive(_ input: Input) { | |
lock.lock() | |
guard state.subscription != nil else { | |
lock.unlock() | |
return | |
} | |
lock.unlock() | |
downstreamLock.lock() | |
let newDemand = downstream.receive(input) | |
downstreamLock.unlock() | |
if newDemand == .none { return } | |
lock.lock() | |
let subscription = state.subscription | |
lock.unlock() | |
subscription?.request(newDemand) | |
} | |
func receive(completion: Subscribers.Completion<Failure>) { | |
lock.lock() | |
guard case let .subscribed(subscription) = state else { | |
lock.unlock() | |
return | |
} | |
state = .pendingTerminal(subscription) | |
lock.unlock() | |
scheduler.schedule(options: options) { | |
self.scheduledReceive(completion: completion) | |
} | |
} | |
private func scheduledReceive(completion: Subscribers.Completion<Failure>) { | |
lock.lock() | |
state = .terminal | |
lock.unlock() | |
downstreamLock.lock() | |
downstream.receive(completion: completion) | |
downstreamLock.unlock() | |
} | |
func request(_ demand: Subscribers.Demand) { | |
lock.lock() | |
guard case let .subscribed(subscription) = state else { | |
lock.unlock() | |
return | |
} | |
lock.unlock() | |
subscription.request(demand) | |
} | |
func cancel() { | |
lock.lock() | |
guard case let .subscribed(subscription) = state else { | |
lock.unlock() | |
return | |
} | |
state = .terminal | |
lock.unlock() | |
subscription.cancel() | |
} | |
var description: String { return "ReceiveOn" } | |
var customMirror: Mirror { return Mirror(self, children: EmptyCollection()) } | |
var playgroundDescription: Any { return description } | |
} | |
enum SubscriptionStatus { | |
case awaitingSubscription | |
case subscribed(Subscription) | |
case pendingTerminal(Subscription) | |
case terminal | |
var subscription: Subscription? { | |
switch self { | |
case .awaitingSubscription, .terminal: | |
return nil | |
case let .subscribed(subscription), let .pendingTerminal(subscription): | |
return subscription | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment