Skip to content

Instantly share code, notes, and snippets.

@swhitty
Last active July 23, 2021 23:47
Show Gist options
  • Save swhitty/efe6393ff5a13bee3654be9227db4168 to your computer and use it in GitHub Desktop.
Save swhitty/efe6393ff5a13bee3654be9227db4168 to your computer and use it in GitHub Desktop.
// Combine.Publishers.ReceiveOn is seriously broken on iOS 13.0–2
// https://forums.swift.org/t/combine-receive-on-runloop-main-loses-sent-value-how-can-i-make-it-work/28631
// Provides fix by using OpenCombine version on iOS 13.0–2 and Combine version for iOS 13.3+
import Combine
import Foundation
@available(iOS, deprecated: 13.3, message: "use receive(on:)")
public extension Publisher {
/// Returns a publisher that simply calls `receive(on:)` on iOS 13.3 and later
/// Earlier versions used a fixed publisher from the OpenCombine project.
func receiveFix<Context: Scheduler>(
on scheduler: Context,
options: Context.SchedulerOptions? = nil
) -> ReceiveOnFix<Self, Context> {
return .init(upstream: self, scheduler: scheduler, options: options)
}
}
/// Publisher that switches between the Combine `ReceiveOn` publisher and a fixed version depending on OS version.
/// `Publishers.ReceiveOn` includes recursive lock bugs on iOS 13.0–2.
@available(iOS, deprecated: 13.3, message: "use Publishers.ReceiveOn")
public struct ReceiveOnFix<Upstream: Publisher, Context: Scheduler>: Publisher {
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
private let upstream: Upstream
private let scheduler: Context
private let options: Context.SchedulerOptions?
public init(upstream: Upstream, scheduler: Context, options: Context.SchedulerOptions?) {
self.upstream = upstream
self.scheduler = scheduler
self.options = options
}
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
if #available(iOS 13.3, *) {
let publisher = Publishers.ReceiveOn(upstream: upstream,
scheduler: scheduler,
options: options)
publisher.receive(subscriber: subscriber)
} else {
let inner = Inner(scheduler: scheduler,
options: options,
downstream: subscriber)
upstream.subscribe(inner)
}
}
}
@available(iOS, deprecated: 13.3, message: "use Publishers.ReceiveOn")
private extension ReceiveOnFix {
/// Taken from OpenCombine
/// https://github.com/OpenCombine/OpenCombine/blob/0.12.0/Sources/OpenCombine/Publishers/Publishers.ReceiveOn.swift
final class Inner<Downstream: Subscriber>
: Subscriber,
Subscription,
CustomStringConvertible,
CustomReflectable,
CustomPlaygroundDisplayConvertible
where Downstream.Input == Upstream.Output, Downstream.Failure == Upstream.Failure
{
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private let lock = NSLock()
private let downstream: Downstream
private let scheduler: Context
private let options: Context.SchedulerOptions?
private var state = SubscriptionStatus.awaitingSubscription
private let downstreamLock = NSRecursiveLock()
init(scheduler: Context,
options: Context.SchedulerOptions?,
downstream: Downstream) {
self.downstream = downstream
self.scheduler = scheduler
self.options = options
}
func receive(subscription: Subscription) {
lock.lock()
guard case .awaitingSubscription = state else {
lock.unlock()
subscription.cancel()
return
}
state = .subscribed(subscription)
lock.unlock()
downstreamLock.lock()
downstream.receive(subscription: self)
downstreamLock.unlock()
}
func receive(_ input: Input) -> Subscribers.Demand {
lock.lock()
guard case .subscribed = state else {
lock.unlock()
return .none
}
lock.unlock()
scheduler.schedule(options: options) {
self.scheduledReceive(input)
}
return .none
}
private func scheduledReceive(_ input: Input) {
lock.lock()
guard state.subscription != nil else {
lock.unlock()
return
}
lock.unlock()
downstreamLock.lock()
let newDemand = downstream.receive(input)
downstreamLock.unlock()
if newDemand == .none { return }
lock.lock()
let subscription = state.subscription
lock.unlock()
subscription?.request(newDemand)
}
func receive(completion: Subscribers.Completion<Failure>) {
lock.lock()
guard case let .subscribed(subscription) = state else {
lock.unlock()
return
}
state = .pendingTerminal(subscription)
lock.unlock()
scheduler.schedule(options: options) {
self.scheduledReceive(completion: completion)
}
}
private func scheduledReceive(completion: Subscribers.Completion<Failure>) {
lock.lock()
state = .terminal
lock.unlock()
downstreamLock.lock()
downstream.receive(completion: completion)
downstreamLock.unlock()
}
func request(_ demand: Subscribers.Demand) {
lock.lock()
guard case let .subscribed(subscription) = state else {
lock.unlock()
return
}
lock.unlock()
subscription.request(demand)
}
func cancel() {
lock.lock()
guard case let .subscribed(subscription) = state else {
lock.unlock()
return
}
state = .terminal
lock.unlock()
subscription.cancel()
}
var description: String { return "ReceiveOn" }
var customMirror: Mirror { return Mirror(self, children: EmptyCollection()) }
var playgroundDescription: Any { return description }
}
enum SubscriptionStatus {
case awaitingSubscription
case subscribed(Subscription)
case pendingTerminal(Subscription)
case terminal
var subscription: Subscription? {
switch self {
case .awaitingSubscription, .terminal:
return nil
case let .subscribed(subscription), let .pendingTerminal(subscription):
return subscription
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment