Last active
October 30, 2020 13:11
-
-
Save groue/6c6059fb89620dc93d0f4905c1f9cb69 to your computer and use it in GitHub Desktop.
AsynchronousOperation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
/// To create an operation: | |
/// | |
/// 1. Subclass AsynchronousOperation, override main, and eventually cancel the | |
/// operation, or set result to a non-nil value. | |
/// | |
/// 2. Use makeOperation { op in ... }, and eventually cancel the | |
/// operation, or set result to a non-nil value. | |
open class AsynchronousOperation<Output, Failure: Error>: Operation { | |
/// Setting the result to a non-nil value finishes the operation. | |
/// | |
/// If operation is already finished, setting the result has no effect. | |
var result: Result<Output, Failure>? { | |
get { return _result } | |
set { | |
synchronized { | |
if _isFinished { return } | |
if _isEarlyFinished { return } | |
if _isExecuting { | |
willChangeValue(forKey: isExecutingKey) | |
willChangeValue(forKey: isFinishedKey) | |
_isExecuting = false | |
_result = newValue | |
_isFinished = true | |
didChangeValue(forKey: isFinishedKey) | |
didChangeValue(forKey: isExecutingKey) | |
} else { | |
_isEarlyFinished = true | |
_result = newValue | |
} | |
didComplete() | |
} | |
} | |
} | |
override open func cancel() { | |
synchronized { | |
if _isFinished { return } | |
if _isEarlyFinished { return } | |
if _isCancelled == false { | |
willChangeValue(forKey: isCancelledKey) | |
_isCancelled = true | |
didChangeValue(forKey: isCancelledKey) | |
} | |
if _isExecuting { | |
super.cancel() | |
willChangeValue(forKey: isExecutingKey) | |
willChangeValue(forKey: isFinishedKey) | |
_isExecuting = false | |
_isFinished = true | |
didChangeValue(forKey: isFinishedKey) | |
didChangeValue(forKey: isExecutingKey) | |
} else { | |
_isEarlyFinished = true | |
} | |
didComplete() | |
} | |
} | |
/// Updates the `completionBlock` of the operation so that it performs the | |
/// provided result handler. | |
/// | |
/// The result handler is executed when the operation completes | |
/// successfully, completes with an error, or is cancelled. | |
/// | |
/// The result handler is executed on the specified dispatch queue, which | |
/// defaults to `DispatchQueue.main`. When the queue is nil, the result | |
/// handler is executed right within the operation's `completionBlock`. | |
/// | |
/// The result handler can refer to the operation without preventing the | |
/// operation to deallocate after completion. | |
/// | |
/// - parameter queue: The `DispatchQueue` which runs the result handler. | |
/// When the queue is nil, the completion block is executed right within | |
/// the operation's `completionBlock`. | |
/// - parameter resultHandler: A function that runs when the operation | |
/// is finished. | |
/// - parameter result: The result of the operation. If nil, the | |
/// operation was cancelled. | |
public func handleCompletion( | |
onQueue queue: DispatchQueue? = DispatchQueue.main, | |
result resultHandler: @escaping (_ result: Result<Output, Failure>?) -> Void) | |
{ | |
completionBlock = { [unowned self] in | |
let result = self.result | |
assert(result != nil || self.isCancelled) | |
if let queue = queue { | |
queue.async { | |
resultHandler(result) | |
} | |
} else { | |
resultHandler(result) | |
} | |
} | |
} | |
/// Updates the `completionBlock` of the operation so that it performs the | |
/// provided result handler. | |
/// | |
/// The result handler is executed when the operation completes | |
/// successfully, completes with an error, but not when the operation | |
/// is cancelled. | |
/// | |
/// The result handler is executed on the specified dispatch queue, which | |
/// defaults to `DispatchQueue.main`. When the queue is nil, the result | |
/// handler is executed right within the operation's `completionBlock`. | |
/// | |
/// The result handler can refer to the operation without preventing the | |
/// operation to deallocate after completion. | |
/// | |
/// - parameter queue: The `DispatchQueue` which runs the result handler. | |
/// When the queue is nil, the completion block is executed right within | |
/// the operation's `completionBlock`. | |
/// - parameter resultHandler: A function that runs when the operation | |
/// is finished. | |
/// - parameter result: The result of the operation. | |
public func handleResult( | |
onQueue queue: DispatchQueue? = DispatchQueue.main, | |
with resultHandler: @escaping (_ result: Result<Output, Failure>) -> Void) | |
{ | |
handleCompletion(onQueue: queue) { result in | |
if let result = result { | |
resultHandler(result) | |
} | |
} | |
} | |
public static func blockOperation(_ block: @escaping (AsynchronousOperation<Output, Failure>) -> Void) -> AsynchronousOperation<Output, Failure> { | |
return AsynchronousBlockOperation(block) | |
} | |
/// Don't override. Override main() instead. | |
override public func start() { | |
synchronized { | |
if _isEarlyFinished { | |
willChangeValue(forKey: isFinishedKey) | |
_isFinished = true | |
didChangeValue(forKey: isFinishedKey) | |
} else { | |
willChangeValue(forKey: isExecutingKey) | |
_isExecuting = true | |
didChangeValue(forKey: isExecutingKey) | |
main() | |
} | |
} | |
} | |
/// Called after the result has been set, or operation was cancelled. | |
/// | |
/// Subclasses can override this method. Default implementation | |
/// does nothing. | |
/// | |
/// - warning: Don't assume the `isFinished` flag is set, because | |
/// an asynchronous operation may get a result, or be cancelled, before it | |
/// is scheduled in an operation queue. | |
open func didComplete() { } | |
override open var isAsynchronous: Bool { return true } | |
override open var isExecuting: Bool { return _isExecuting } | |
override open var isFinished: Bool { return _isFinished } | |
override open var isCancelled: Bool { return _isCancelled } | |
private var _result: Result<Output, Failure>? = nil | |
private var lock = NSRecursiveLock() | |
private var _isCancelled: Bool = false | |
private var _isExecuting: Bool = false | |
private var _isFinished: Bool = false | |
/// Set to true whenever the operation is completed before the operation | |
/// queue has called the start() method, and the operation has become | |
/// "executing". We have to wait until start() is called before we trigger | |
/// the "isFinished" KVO notifications, or we get warnings in the console, | |
/// and even hard crashes. | |
private var _isEarlyFinished: Bool = false | |
private func synchronized<T>(_ execute: () throws -> T) rethrows -> T { | |
lock.lock() | |
defer { lock.unlock() } | |
return try execute() | |
} | |
} | |
private let isCancelledKey = "isCancelled" | |
private let isExecutingKey = "isExecuting" | |
private let isFinishedKey = "isFinished" | |
private class AsynchronousBlockOperation<Output, Failure: Error>: AsynchronousOperation<Output, Failure> { | |
let block: (AsynchronousOperation<Output, Failure>) -> Void | |
init(_ block: @escaping (AsynchronousOperation<Output, Failure>) -> Void) { | |
self.block = block | |
} | |
override func main() { | |
block(self) | |
} | |
} | |
extension OperationQueue { | |
public convenience init( | |
name: String? = nil, | |
qualityOfService: QualityOfService, | |
maxConcurrentOperationCount: Int = OperationQueue.defaultMaxConcurrentOperationCount) | |
{ | |
self.init() | |
self.name = name | |
self.qualityOfService = qualityOfService | |
self.maxConcurrentOperationCount = maxConcurrentOperationCount | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import XCTest | |
class AsynchronousOperationTests: XCTestCase { | |
// TODO: add tests for start() called without any OperationQueue. | |
// TODO: do we have to do anything regarding the isReady flag? | |
/// Waits for the operation to complete. This tests makes sure the operation | |
/// is well behaved and correctly notifies its completion to an | |
/// operation queue. | |
/// | |
/// - parameter afterEnqueue: a function to execute after the operation | |
/// has been added to an operation queue, but before the operation has | |
/// started. | |
private func waitForOperationCompletion(_ operation: Operation, afterEnqueue: (() -> Void)? = nil) { | |
/// Makes sure the afterEnqueue function is called before the tested | |
/// operation starts | |
let beforeSemaphore = DispatchSemaphore(value: 0) | |
let beforeOperation = BlockOperation { | |
beforeSemaphore.wait() | |
} | |
operation.addDependency(beforeOperation) | |
/// Makes sure the operation completes "normally", according to | |
/// OperationQueue, by waiting for a dependent operation. | |
let expectation = self.expectation(description: "Operation Completion") | |
let finishOperation = BlockOperation { | |
expectation.fulfill() | |
} | |
finishOperation.addDependency(operation) | |
let queue = OperationQueue() | |
queue.addOperation(beforeOperation) | |
queue.addOperation(operation) | |
queue.addOperation(finishOperation) | |
afterEnqueue?() | |
beforeSemaphore.signal() | |
wait(for: [expectation], timeout: 1) | |
} | |
// MARK: - Finish synchronously from main() | |
// | |
// Here we test that AsynchronousOperation can behave like a synchronous | |
// operation, which finishes synchronously from its main() method. | |
func testAsynchronousOperationCanFinishImmediatelyWithSuccess() throws { | |
class TestOperation: AsynchronousOperation<Int, Error> { | |
override func main() { | |
result = .success(1) | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion Block") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
XCTFail("Expected result") | |
case let .success(value): | |
XCTAssertEqual(value, 1) | |
case .failure: | |
XCTFail("Unexpected error") | |
} | |
expectation.fulfill() | |
} | |
waitForOperationCompletion(operation) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
func testAsynchronousOperationCanFinishImmediatelyWithError() throws { | |
struct TestError: Error { } | |
class TestOperation: AsynchronousOperation<Int, TestError> { | |
override func main() { | |
result = .failure(TestError()) | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
XCTFail("Expected result") | |
case .success: | |
XCTFail("Unexpected value") | |
case .failure: | |
break | |
} | |
expectation.fulfill() | |
} | |
waitForOperationCompletion(operation) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
func testAsynchronousOperationCanFinishImmediatelyWithCancellation() throws { | |
class TestOperation: AsynchronousOperation<Int, Error> { | |
override func main() { | |
cancel() | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
break | |
case .success: | |
XCTFail("Unexpected value") | |
case .failure: | |
XCTFail("Unexpected error") | |
} | |
expectation.fulfill() | |
} | |
waitForOperationCompletion(operation) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
// MARK: - Finish asynchronously from main() | |
// | |
// Here we test that AsynchronousOperation can behave like an genuine | |
// asynchronous operation, which is not finished when its main() method | |
// exits. | |
func testAsynchronousOperationCanFinishAsynchronouslyWithSuccess() throws { | |
class TestOperation: AsynchronousOperation<Int, Error> { | |
override func main() { | |
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + .milliseconds(100)) { | |
self.result = .success(1) | |
} | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
XCTFail("Expected result") | |
case let .success(value): | |
XCTAssertEqual(value, 1) | |
case .failure: | |
XCTFail("Unexpected error") | |
} | |
expectation.fulfill() | |
} | |
waitForOperationCompletion(operation) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
func testAsynchronousOperationCanFinishAsynchronouslyWithError() throws { | |
struct TestError: Error { } | |
class TestOperation: AsynchronousOperation<Int, TestError> { | |
override func main() { | |
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + .milliseconds(100)) { | |
self.result = .failure(TestError()) | |
} | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
XCTFail("Expected result") | |
case .success: | |
XCTFail("Unexpected value") | |
case .failure: | |
break | |
} | |
expectation.fulfill() | |
} | |
waitForOperationCompletion(operation) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
func testAsynchronousOperationCanFinishAsynchronouslyWithCancellation() throws { | |
class TestOperation: AsynchronousOperation<Int, Error> { | |
override func main() { | |
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + .milliseconds(100)) { | |
self.cancel() | |
} | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
break | |
case .success: | |
XCTFail("Unexpected value") | |
case .failure: | |
XCTFail("Unexpected error") | |
} | |
expectation.fulfill() | |
} | |
waitForOperationCompletion(operation) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
// MARK: - Finish after enqueue, but before start() is called | |
// | |
// Here we test that AsynchronousOperation can finish *after* it has been | |
// enqueued, but *before* the operation queue has started the operation. | |
// | |
// We want applications to freely finish operations by cancelling them or | |
// setting their results, but we know that OperationQueue is... picky. | |
// So it's important to test specific schedulings. | |
func testAsynchronousOperationCanFinishBeforeStartWithSuccess() throws { | |
class TestOperation: AsynchronousOperation<Int, Error> { | |
override func main() { | |
XCTFail("Should not run") | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
XCTFail("Expected result") | |
case let .success(value): | |
XCTAssertEqual(value, 1) | |
case .failure: | |
XCTFail("Unexpected error") | |
} | |
expectation.fulfill() | |
} | |
waitForOperationCompletion(operation, afterEnqueue: { | |
operation.result = .success(1) | |
}) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
func testAsynchronousOperationCanFinishBeforeStartWithError() throws { | |
struct TestError: Error { } | |
class TestOperation: AsynchronousOperation<Int, TestError> { | |
override func main() { | |
XCTFail("Should not run") | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
XCTFail("Expected result") | |
case .success: | |
XCTFail("Unexpected value") | |
case .failure: | |
break | |
} | |
expectation.fulfill() | |
} | |
waitForOperationCompletion(operation, afterEnqueue: { | |
operation.result = .failure(TestError()) | |
}) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
func testAsynchronousOperationCanFinishBeforeStartWithCancellation() throws { | |
class TestOperation: AsynchronousOperation<Int, Error> { | |
override func main() { | |
XCTFail("Should not run") | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
break | |
case .success: | |
XCTFail("Unexpected value") | |
case .failure: | |
XCTFail("Unexpected error") | |
} | |
expectation.fulfill() | |
} | |
waitForOperationCompletion(operation, afterEnqueue: { | |
operation.cancel() | |
}) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
// MARK: - Finish before enqueue | |
// | |
// Here we test that AsynchronousOperation can finish *before* it has been | |
// enqueued. | |
// | |
// We want applications to freely finish operations by cancelling them or | |
// setting their results, but we know that OperationQueue is... picky. | |
// So it's important to test specific schedulings. | |
func testAsynchronousOperationCanFinishBeforeEnqueueWithSuccess() throws { | |
class TestOperation: AsynchronousOperation<Int, Error> { | |
override func main() { | |
XCTFail("Should not run") | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
XCTFail("Expected result") | |
case let .success(value): | |
XCTAssertEqual(value, 1) | |
case .failure: | |
XCTFail("Unexpected error") | |
} | |
expectation.fulfill() | |
} | |
operation.result = .success(1) | |
waitForOperationCompletion(operation) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
func testAsynchronousOperationCanFinishBeforeEnqueueWithError() throws { | |
struct TestError: Error { } | |
class TestOperation: AsynchronousOperation<Int, TestError> { | |
override func main() { | |
XCTFail("Should not run") | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
XCTFail("Expected result") | |
case .success: | |
XCTFail("Unexpected value") | |
case .failure: | |
break | |
} | |
expectation.fulfill() | |
} | |
operation.result = .failure(TestError()) | |
waitForOperationCompletion(operation) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
func testAsynchronousOperationCanFinishBeforeEnqueueWithCancellation() throws { | |
class TestOperation: AsynchronousOperation<Int, Error> { | |
override func main() { | |
XCTFail("Should not run") | |
} | |
} | |
let operation = TestOperation() | |
let expectation = self.expectation(description: "Completion") | |
operation.handleCompletion { result in | |
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread") | |
switch result { | |
case nil: | |
break | |
case .success: | |
XCTFail("Unexpected value") | |
case .failure: | |
XCTFail("Unexpected error") | |
} | |
expectation.fulfill() | |
} | |
operation.cancel() | |
waitForOperationCompletion(operation) | |
waitForExpectations(timeout: 1, handler: nil) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import RxSwift | |
extension PrimitiveSequence where Trait == MaybeTrait { | |
/// Wraps the receiver Maybe in an Operation scheduled in the specified | |
/// operation queue. | |
/// | |
/// The Operation in added to the queue when the returned Maybe | |
/// is subscribed. | |
/// | |
/// The receiver Maybe is subscribed when the Operation starts. | |
/// | |
/// The Operation completes when the receiver Maybe completes. | |
/// | |
/// The returned Maybe completes on the specified dispatch queue, | |
/// which defaults to `DispatchQueue.main`. When the queue is nil, the | |
/// completion block is executed right within the | |
/// Operation's completionBlock. | |
func inOperationQueue( | |
_ operationQueue: OperationQueue, | |
observeOn queue: DispatchQueue? = DispatchQueue.main) | |
-> Maybe<Element> | |
{ | |
Maybe.create { event -> Disposable in | |
let op = MaybeOperation(self) | |
op.handleResult(onQueue: queue) { result in | |
switch result { | |
case let .success(element): | |
if let element = element { | |
event(.success(element)) | |
} else { | |
event(.completed) | |
} | |
case let .failure(error): | |
event(.error(error)) | |
} | |
} | |
operationQueue.addOperation(op) | |
return Disposables.create { [weak op] in | |
op?.cancel() | |
} | |
} | |
} | |
} | |
extension PrimitiveSequence where Trait == SingleTrait { | |
/// Wraps the receiver Single in an Operation scheduled in the specified | |
/// operation queue. | |
/// | |
/// The Operation in added to the queue when the returned Single | |
/// is subscribed. | |
/// | |
/// The receiver Single is subscribed when the Operation starts. | |
/// | |
/// The Operation completes when the receiver Single completes. | |
/// | |
/// The returned Single completes on the specified dispatch queue, | |
/// which defaults to `DispatchQueue.main`. When the queue is nil, the | |
/// completion block is executed right within the | |
/// Operation's completionBlock. | |
func inOperationQueue( | |
_ operationQueue: OperationQueue, | |
observeOn queue: DispatchQueue? = DispatchQueue.main) | |
-> Single<Element> | |
{ | |
Single.create { event -> Disposable in | |
let op = SingleOperation(self) | |
op.handleResult(onQueue: queue) { result in | |
switch result { | |
case let .success(element): | |
event(.success(element)) | |
case let .failure(error): | |
event(.error(error)) | |
} | |
} | |
operationQueue.addOperation(op) | |
return Disposables.create { [weak op] in | |
op?.cancel() | |
} | |
} | |
} | |
} | |
extension Completable { | |
/// Wraps the receiver Completable in an Operation scheduled in the | |
/// specified operation queue. | |
/// | |
/// The Operation in added to the queue when the returned Completable | |
/// is subscribed. | |
/// | |
/// The receiver Completable is subscribed when the Operation starts. | |
/// | |
/// The Operation completes when the receiver Completable completes. | |
/// | |
/// The returned Completable completes on the specified dispatch queue, | |
/// which defaults to `DispatchQueue.main`. When the queue is nil, the | |
/// completion block is executed right within the | |
/// Operation's completionBlock. | |
func inOperationQueue( | |
_ operationQueue: OperationQueue, | |
observeOn queue: DispatchQueue? = DispatchQueue.main) | |
-> Completable | |
{ | |
Completable.create { event -> Disposable in | |
let op = CompletableOperation(self) | |
op.handleResult(onQueue: queue) { result in | |
switch result { | |
case .success: | |
event(.completed) | |
case let .failure(error): | |
event(.error(error)) | |
} | |
} | |
operationQueue.addOperation(op) | |
return Disposables.create { [weak op] in | |
op?.cancel() | |
} | |
} | |
} | |
} | |
// MARK: - Support Operations | |
private class DisposableOperation<T>: AsynchronousOperation<T, Error> { | |
private var disposable: Disposable? | |
deinit { | |
disposable?.dispose() | |
} | |
func subscribe() -> Disposable { | |
fatalError("Abstract method") | |
} | |
override func main() { | |
disposable = subscribe() | |
} | |
override func cancel() { | |
disposable?.dispose() | |
disposable = nil | |
super.cancel() | |
} | |
} | |
/// Support for Maybe.inOperationQueue(_:) | |
private class MaybeOperation<Element>: DisposableOperation<Element?> { | |
let maybe: Maybe<Element> | |
init(_ maybe: Maybe<Element>) { | |
self.maybe = maybe | |
} | |
override func subscribe() -> Disposable { | |
maybe.subscribe( | |
onSuccess: { [weak self] in self?.result = .success($0) }, | |
onError: { [weak self] in self?.result = .failure($0) }, | |
onCompleted: { [weak self] in self?.result = .success(nil) }) | |
} | |
} | |
/// Support for Single.inOperationQueue(_:) | |
private class SingleOperation<Element>: DisposableOperation<Element> { | |
let single: Single<Element> | |
init(_ single: Single<Element>) { | |
self.single = single | |
} | |
override func subscribe() -> Disposable { | |
single.subscribe( | |
onSuccess: { [weak self] in self?.result = .success($0) }, | |
onError: { [weak self] in self?.result = .failure($0) }) | |
} | |
} | |
/// Support for Completable.inOperationQueue(_:) | |
private class CompletableOperation: DisposableOperation<Void> { | |
let completable: Completable | |
init(_ completable: Completable) { | |
self.completable = completable | |
} | |
override func subscribe() -> Disposable { | |
completable.subscribe( | |
onCompleted: { [weak self] in self?.result = .success(()) }, | |
onError: { [weak self] in self?.result = .failure($0) }) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
import Foundation | |
extension SinglePublisher { | |
/// Returns an asynchronous operation that wraps the upstream publisher. | |
/// | |
/// The uptream publisher is subscribed when the operation starts. The | |
/// operation completes when the uptream publisher completes. | |
/// | |
/// Use `subscribe(on:options:)` when you need to control when the upstream | |
/// publisher is subscribed: | |
/// | |
/// let operation = upstreamPublisher | |
/// .subscribe(on: DispatchQueue.main) | |
/// .operation() | |
func operation() -> SinglePublisherOperation<Self> { | |
SinglePublisherOperation(self) | |
} | |
/// Returns a publisher which, on subscription, wraps the upstream publisher | |
/// in an operation, and adds the operation to the provided operation queue. | |
/// | |
/// The uptream publisher is subscribed when the operation starts. The | |
/// operation completes when the uptream publisher completes. The returned | |
/// publisher completes with the operation. | |
/// | |
/// Use `subscribe(on:options:)` when you need to control when the upstream | |
/// publisher is subscribed: | |
/// | |
/// let publisher = upstreamPublisher | |
/// .subscribe(on: DispatchQueue.main) | |
/// .inOperationQueue(queue) | |
/// | |
/// Use `receive(on:options:)` when you need to control when the returned | |
/// publisher publishes its elements and completion: | |
/// | |
/// let publisher = upstreamPublisher | |
/// .inOperationQueue(queue) | |
/// .receive(on: DispatchQueue.main) | |
/// | |
/// - parameter operationQueue: The `OperationQueue` to run the publisher in. | |
func inOperationQueue(_ operationQueue: OperationQueue) | |
-> AsynchronousOperationPublisher<Self> | |
{ | |
AsynchronousOperationPublisher( | |
upstream: self, | |
operationQueue: operationQueue) | |
} | |
} | |
// MARK: - AsynchronousOperationPublisher | |
/// A publisher that runs an asynchronous operation. | |
struct AsynchronousOperationPublisher<Upstream: SinglePublisher>: SinglePublisher { | |
typealias Output = Upstream.Output | |
typealias Failure = Upstream.Failure | |
private struct Context { | |
let upstream: Upstream | |
let queue: OperationQueue | |
} | |
// swiftlint:disable:next colon | |
private class Subscription<Downstream: Subscriber>: | |
TraitSubscriptions.Single<Downstream, Context> | |
where | |
Downstream.Input == Output, | |
Downstream.Failure == Failure | |
{ | |
private weak var operation: SinglePublisherOperation<Upstream>? | |
override func start(with context: Context) { | |
let operation = context.upstream.operation() | |
operation.handleCompletion(onQueue: nil) { [weak self] result in | |
guard let self = self else { return } | |
switch result { | |
case nil: | |
self.cancel() | |
case let .success(value): | |
self.receive(.success(value)) | |
case let .failure(error): | |
self.receive(.failure(error)) | |
} | |
} | |
self.operation = operation | |
context.queue.addOperation(operation) | |
} | |
override func didCancel(with context: Context) { | |
operation?.cancel() | |
} | |
} | |
private let context: Context | |
fileprivate init( | |
upstream: Upstream, | |
operationQueue: OperationQueue) | |
{ | |
context = Context(upstream: upstream, queue: operationQueue) | |
} | |
func receive<S>(subscriber: S) | |
where S: Subscriber, S.Failure == Self.Failure, S.Input == Self.Output | |
{ | |
let subscription = Subscription(downstream: subscriber, context: context) | |
subscriber.receive(subscription: subscription) | |
} | |
} | |
// MARK: - SinglePublisherOperation | |
/// An operation that runs a publisher. | |
class SinglePublisherOperation<Upstream: SinglePublisher>: AsynchronousOperation<Upstream.Output, Upstream.Failure> { | |
private var upstream: Upstream? | |
private var cancellable: AnyCancellable? | |
fileprivate init(_ upstream: Upstream) { | |
self.upstream = upstream | |
} | |
override func main() { | |
guard let upstream = upstream else { | |
// It can only get nil if operation was cancelled. Who would | |
// call main() on a cancelled operation? Nobody. | |
preconditionFailure("SingleOperation started without upstream publisher") | |
} | |
cancellable = upstream.sinkSingle { [weak self] result in | |
self?.result = result | |
} | |
// Release memory | |
self.upstream = nil | |
} | |
override func cancel() { | |
super.cancel() | |
upstream = nil | |
cancellable = nil | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment