-
-
Save vlondon/7eab85a087d6f0d5df69e0a6258315be to your computer and use it in GitHub Desktop.
CancelBag
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
import Foundation | |
/// A thread-safe store for cancellables which addresses usability pain points | |
/// with stock Combine apis. | |
/// | |
/// ## Thread-safe storage of cancellables | |
/// | |
/// let cancelBag = CancelBag() | |
/// cancellable.store(in: cancelBag) | |
/// cancelBag.remove(cancellable) | |
/// | |
/// ## Memory consumption | |
/// | |
/// Use case: keep a cancellable alive until the cancelBag is drained, but | |
/// remove them from memory once the subscription completes or is cancelled. | |
/// | |
/// // Releases memory when subscription completes or is cancelled. | |
/// publisher.sink( | |
/// in: cancelBag, | |
/// receiveCompletion: ... | |
/// receiveValue: ...) | |
/// | |
/// // Manual cancellation is still possible | |
/// let cancellable = publisher.sink( | |
/// in: cancelBag, | |
/// receiveCompletion: ... | |
/// receiveValue: ...) | |
/// cancellable.cancel() | |
/// | |
/// ## Important | |
/// | |
/// CancelBag cancels its cancellables when it is deinitialized. | |
final class CancelBag { | |
var isEmpty: Bool { synchronized { cancellables.isEmpty } } | |
private var lock = NSRecursiveLock() // Allow reentrancy | |
private var cancellables: [AnyCancellable] = [] | |
private var isCancelling = false | |
deinit { | |
cancel() | |
} | |
func remove(_ cancellable: AnyCancellable) { | |
synchronized { | |
if let index = cancellables.firstIndex(where: { $0 === cancellable }) { | |
cancellables.remove(at: index) | |
} | |
} | |
} | |
fileprivate func store<T: Cancellable>(_ cancellable: T) { | |
synchronized { | |
if let any = cancellable as? AnyCancellable { | |
// Don't lose cancellable identity, so that we can remove it. | |
cancellables.append(any) | |
} else { | |
cancellable.store(in: &cancellables) | |
} | |
} | |
} | |
private func synchronized<T>(_ execute: () throws -> T) rethrows -> T { | |
lock.lock() | |
defer { lock.unlock() } | |
return try execute() | |
} | |
} | |
extension CancelBag: Cancellable { | |
func cancel() { | |
synchronized { | |
// Avoid exclusive access violation: each cancellable may trigger a | |
// call to remove(_:), and mutate self.cancellables | |
let cancellables = self.cancellables | |
for cancellable in cancellables { | |
cancellable.cancel() | |
} | |
// OK, they are all cancelled now | |
self.cancellables = [] | |
} | |
} | |
} | |
extension Cancellable { | |
func store(in bag: CancelBag) { | |
bag.store(self) | |
} | |
} | |
extension Publisher { | |
/// Attaches a subscriber with closure-based behavior. | |
/// | |
/// This method creates the subscriber and immediately requests an unlimited | |
/// number of values. | |
/// | |
/// The returned cancellable is added to cancelBag, and removed when | |
/// publisher completes. | |
/// | |
/// - parameter cancelBag: A CancelBag instance. | |
/// - parameter receiveComplete: The closure to execute on completion. | |
/// - parameter receiveValue: The closure to execute on receipt of a value. | |
/// - returns: An AnyCancellable instance. | |
@discardableResult | |
func sink( | |
in cancelBag: CancelBag, | |
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void, | |
receiveValue: @escaping (Output) -> Void) | |
-> AnyCancellable | |
{ | |
var cancellable: AnyCancellable? | |
// Prevents a retain cycle when cancellable retains itself | |
var unmanagedCancellable: Unmanaged<AnyCancellable>? | |
cancellable = self | |
.handleEvents( | |
receiveCancel: { [weak cancelBag] in | |
// Postpone cleanup in case subscription finishes | |
// before cancellable is set. | |
if let unmanagedCancellable = unmanagedCancellable { | |
cancelBag?.remove(unmanagedCancellable.takeUnretainedValue()) | |
unmanagedCancellable.release() | |
} else { | |
DispatchQueue.main.async { | |
if let unmanagedCancellable = unmanagedCancellable { | |
cancelBag?.remove(unmanagedCancellable.takeUnretainedValue()) | |
unmanagedCancellable.release() | |
} | |
} | |
} | |
}) | |
.sink( | |
receiveCompletion: { [weak cancelBag] completion in | |
receiveCompletion(completion) | |
// Postpone cleanup in case subscription finishes | |
// before cancellable is set. | |
if let unmanagedCancellable = unmanagedCancellable { | |
cancelBag?.remove(unmanagedCancellable.takeUnretainedValue()) | |
unmanagedCancellable.release() | |
} else { | |
DispatchQueue.main.async { | |
if let unmanagedCancellable = unmanagedCancellable { | |
cancelBag?.remove(unmanagedCancellable.takeUnretainedValue()) | |
unmanagedCancellable.release() | |
} | |
} | |
} | |
}, | |
receiveValue: receiveValue) | |
unmanagedCancellable = Unmanaged.passRetained(cancellable!) | |
cancellable!.store(in: cancelBag) | |
return cancellable! | |
} | |
} | |
extension Publisher where Failure == Never { | |
/// Attaches a subscriber with closure-based behavior. | |
/// | |
/// This method creates the subscriber and immediately requests an unlimited | |
/// number of values. | |
/// | |
/// The returned cancellable is added to cancelBag, and removed when | |
/// publisher completes. | |
/// | |
/// - parameter cancelBag: A CancelBag instance. | |
/// - parameter receiveValue: The closure to execute on receipt of a value. | |
/// - returns: An AnyCancellable instance. | |
@discardableResult | |
func sink( | |
in cancelBag: CancelBag, | |
receiveValue: @escaping (Output) -> Void) | |
-> AnyCancellable | |
{ | |
sink(in: cancelBag, receiveCompletion: { _ in }, receiveValue: receiveValue) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
import XCTest | |
final class CancelBagTests: XCTestCase { | |
func testCancelBagExplicitCancel() { | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
var isCancelled = false | |
subject | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(receiveValue: { _ in }) | |
.store(in: cancelBag) | |
XCTAssertFalse(isCancelled) | |
cancelBag.cancel() | |
XCTAssertTrue(isCancelled) | |
} | |
func testCancelBagExplicitCancelRetainingCancellable() { | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
var isCancelled = false | |
let cancellable = subject | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(receiveValue: { _ in }) | |
cancellable.store(in: cancelBag) | |
withExtendedLifetime(cancellable) { | |
XCTAssertFalse(isCancelled) | |
cancelBag.cancel() | |
XCTAssertTrue(isCancelled) | |
} | |
} | |
func testCancelBagImplicitCancelWhenDeinitialized() { | |
var cancelBag: CancelBag? = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
var isCancelled = false | |
subject | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(receiveValue: { _ in }) | |
.store(in: cancelBag!) | |
XCTAssertFalse(isCancelled) | |
cancelBag = nil | |
XCTAssertTrue(isCancelled) | |
} | |
func testCancelBagImplicitCancelWhenDeinitializedRetainingCancellable() { | |
var cancelBag: CancelBag? = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
var isCancelled = false | |
let cancellable = subject | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(receiveValue: { _ in }) | |
cancellable.store(in: cancelBag!) | |
withExtendedLifetime(cancellable) { | |
XCTAssertFalse(isCancelled) | |
cancelBag = nil | |
XCTAssertTrue(isCancelled) | |
} | |
} | |
func testCancelBagAcceptsExternalCancellation() { | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
var isCancelled = false | |
let cancellable = subject | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(receiveValue: { _ in }) | |
cancellable.store(in: cancelBag) | |
XCTAssertFalse(isCancelled) | |
cancellable.cancel() | |
XCTAssertTrue(isCancelled) | |
} | |
func testCancelBagAcceptsExternalCancellationRetainingCancellable() { | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
var isCancelled = false | |
let cancellable = subject | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(receiveValue: { _ in }) | |
cancellable.store(in: cancelBag) | |
withExtendedLifetime(cancellable) { | |
XCTAssertFalse(isCancelled) | |
cancellable.cancel() | |
XCTAssertTrue(isCancelled) | |
} | |
} | |
// MARK: - Sink | |
func testCancelBagSinkJust() { | |
let cancelBag = CancelBag() | |
let publisher = Just(0) | |
var isCancelled = false | |
publisher | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(in: cancelBag, receiveValue: { _ in }) | |
XCTAssertFalse(isCancelled) | |
cancelBag.cancel() | |
XCTAssertFalse(isCancelled) // too late | |
} | |
func testCancelBagSinkEmpty() { | |
let cancelBag = CancelBag() | |
let publisher = Empty(outputType: Void.self, failureType: Never.self) | |
var isCancelled = false | |
publisher | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(in: cancelBag, receiveValue: { _ in }) | |
XCTAssertFalse(isCancelled) | |
cancelBag.cancel() | |
XCTAssertFalse(isCancelled) // too late | |
} | |
func testCancelBagSinkFail() { | |
struct TestError: Error { } | |
let cancelBag = CancelBag() | |
let publisher = Fail(outputType: Void.self, failure: TestError()) | |
var isCancelled = false | |
publisher | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(in: cancelBag, receiveCompletion: { _ in }, receiveValue: { _ in }) | |
XCTAssertFalse(isCancelled) | |
cancelBag.cancel() | |
XCTAssertFalse(isCancelled) // too late | |
} | |
func testCancelBagSinkExplicitCancel() { | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
var isCancelled = false | |
subject | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(in: cancelBag, receiveValue: { _ in }) | |
XCTAssertFalse(isCancelled) | |
cancelBag.cancel() | |
XCTAssertTrue(isCancelled) | |
} | |
func testCancelBagSinkExplicitCancelRetainingCancellable() { | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
var isCancelled = false | |
let cancellable = subject | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(in: cancelBag, receiveValue: { _ in }) | |
withExtendedLifetime(cancellable) { | |
XCTAssertFalse(isCancelled) | |
cancelBag.cancel() | |
XCTAssertTrue(isCancelled) | |
} | |
} | |
func testCancelBagSinkImplicitCancelWhenDeinitialized() { | |
var cancelBag: CancelBag? = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
var isCancelled = false | |
subject | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(in: cancelBag!, receiveValue: { _ in }) | |
XCTAssertFalse(isCancelled) | |
cancelBag = nil | |
XCTAssertTrue(isCancelled) | |
} | |
func testCancelBagSinkImplicitCancelWhenDeinitializedRetainingCancellable() { | |
var cancelBag: CancelBag? = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
var isCancelled = false | |
let cancellable = subject | |
.handleEvents(receiveCancel: { isCancelled = true }) | |
.sink(in: cancelBag!, receiveValue: { _ in }) | |
withExtendedLifetime(cancellable) { | |
XCTAssertFalse(isCancelled) | |
cancelBag = nil | |
XCTAssertTrue(isCancelled) | |
} | |
} | |
func testCancelBagSinkReleasesMemoryOnCancellation() { | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
subject.sink(in: cancelBag, receiveValue: { _ in }) | |
XCTAssertFalse(cancelBag.isEmpty) | |
cancelBag.cancel() | |
XCTAssertTrue(cancelBag.isEmpty) | |
} | |
func testCancelBagSinkReleasesMemoryOnCancellationRetainingCancellable() { | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
let cancellable = subject.sink(in: cancelBag, receiveValue: { _ in }) | |
withExtendedLifetime(cancellable) { | |
XCTAssertFalse(cancelBag.isEmpty) | |
cancelBag.cancel() | |
XCTAssertTrue(cancelBag.isEmpty) | |
} | |
} | |
func testCancelBagSinkReleasesMemoryOnCompletionFinished() { | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
subject.sink(in: cancelBag, receiveValue: { _ in }) | |
XCTAssertFalse(cancelBag.isEmpty) | |
subject.send(completion: .finished) | |
XCTAssertTrue(cancelBag.isEmpty) | |
} | |
func testCancelBagSinkEventuallyReleasesMemoryOnCompletionFinishedImmediate() { | |
let cancelBag = CancelBag() | |
let publisher = Empty<Void, Never>() | |
publisher.sink(in: cancelBag, receiveValue: { _ in }) | |
XCTAssertFalse(cancelBag.isEmpty) | |
let expectation = self.expectation(description: "Empty cancelBag") | |
DispatchQueue.main.async { | |
XCTAssertTrue(cancelBag.isEmpty) | |
expectation.fulfill() | |
} | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
func testCancelBagSinkReleasesMemoryOnCompletionFinishedRetainingCancellable() { | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, Never>() | |
let cancellable = subject.sink(in: cancelBag, receiveValue: { _ in }) | |
withExtendedLifetime(cancellable) { | |
XCTAssertFalse(cancelBag.isEmpty) | |
subject.send(completion: .finished) | |
XCTAssertTrue(cancelBag.isEmpty) | |
} | |
} | |
func testCancelBagSinkReleasesMemoryOnCompletionFailure() { | |
struct TestError: Error { } | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, TestError>() | |
subject.sink(in: cancelBag, receiveCompletion: { _ in }, receiveValue: { _ in }) | |
XCTAssertFalse(cancelBag.isEmpty) | |
subject.send(completion: .failure(TestError())) | |
XCTAssertTrue(cancelBag.isEmpty) | |
} | |
func testCancelBagSinkEventuallyReleasesMemoryOnCompletionFailureImmediate() { | |
struct TestError: Error { } | |
let cancelBag = CancelBag() | |
let publisher = Fail<Void, TestError>(error: TestError()) | |
publisher.sink(in: cancelBag, receiveCompletion: { _ in }, receiveValue: { _ in }) | |
XCTAssertFalse(cancelBag.isEmpty) | |
let expectation = self.expectation(description: "Empty cancelBag") | |
DispatchQueue.main.async { | |
XCTAssertTrue(cancelBag.isEmpty) | |
expectation.fulfill() | |
} | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
func testCancelBagSinkReleasesMemoryOnCompletionFailureRetainingCancellable() { | |
struct TestError: Error { } | |
let cancelBag = CancelBag() | |
let subject = PassthroughSubject<Void, TestError>() | |
let cancellable = subject.sink(in: cancelBag, receiveCompletion: { _ in }, receiveValue: { _ in }) | |
withExtendedLifetime(cancellable) { | |
XCTAssertFalse(cancelBag.isEmpty) | |
subject.send(completion: .failure(TestError())) | |
XCTAssertTrue(cancelBag.isEmpty) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment