Skip to content

Instantly share code, notes, and snippets.

@groue
Last active October 30, 2020 13:11
Show Gist options
  • Save groue/6c6059fb89620dc93d0f4905c1f9cb69 to your computer and use it in GitHub Desktop.
Save groue/6c6059fb89620dc93d0f4905c1f9cb69 to your computer and use it in GitHub Desktop.
AsynchronousOperation
import Foundation
/// To create an operation:
///
/// 1. Subclass AsynchronousOperation, override main, and eventually cancel the
/// operation, or set result to a non-nil value.
///
/// 2. Use makeOperation { op in ... }, and eventually cancel the
/// operation, or set result to a non-nil value.
open class AsynchronousOperation<Output, Failure: Error>: Operation {
/// Setting the result to a non-nil value finishes the operation.
///
/// If operation is already finished, setting the result has no effect.
var result: Result<Output, Failure>? {
get { return _result }
set {
synchronized {
if _isFinished { return }
if _isEarlyFinished { return }
if _isExecuting {
willChangeValue(forKey: isExecutingKey)
willChangeValue(forKey: isFinishedKey)
_isExecuting = false
_result = newValue
_isFinished = true
didChangeValue(forKey: isFinishedKey)
didChangeValue(forKey: isExecutingKey)
} else {
_isEarlyFinished = true
_result = newValue
}
didComplete()
}
}
}
override open func cancel() {
synchronized {
if _isFinished { return }
if _isEarlyFinished { return }
if _isCancelled == false {
willChangeValue(forKey: isCancelledKey)
_isCancelled = true
didChangeValue(forKey: isCancelledKey)
}
if _isExecuting {
super.cancel()
willChangeValue(forKey: isExecutingKey)
willChangeValue(forKey: isFinishedKey)
_isExecuting = false
_isFinished = true
didChangeValue(forKey: isFinishedKey)
didChangeValue(forKey: isExecutingKey)
} else {
_isEarlyFinished = true
}
didComplete()
}
}
/// Updates the `completionBlock` of the operation so that it performs the
/// provided result handler.
///
/// The result handler is executed when the operation completes
/// successfully, completes with an error, or is cancelled.
///
/// The result handler is executed on the specified dispatch queue, which
/// defaults to `DispatchQueue.main`. When the queue is nil, the result
/// handler is executed right within the operation's `completionBlock`.
///
/// The result handler can refer to the operation without preventing the
/// operation to deallocate after completion.
///
/// - parameter queue: The `DispatchQueue` which runs the result handler.
/// When the queue is nil, the completion block is executed right within
/// the operation's `completionBlock`.
/// - parameter resultHandler: A function that runs when the operation
/// is finished.
/// - parameter result: The result of the operation. If nil, the
/// operation was cancelled.
public func handleCompletion(
onQueue queue: DispatchQueue? = DispatchQueue.main,
result resultHandler: @escaping (_ result: Result<Output, Failure>?) -> Void)
{
completionBlock = { [unowned self] in
let result = self.result
assert(result != nil || self.isCancelled)
if let queue = queue {
queue.async {
resultHandler(result)
}
} else {
resultHandler(result)
}
}
}
/// Updates the `completionBlock` of the operation so that it performs the
/// provided result handler.
///
/// The result handler is executed when the operation completes
/// successfully, completes with an error, but not when the operation
/// is cancelled.
///
/// The result handler is executed on the specified dispatch queue, which
/// defaults to `DispatchQueue.main`. When the queue is nil, the result
/// handler is executed right within the operation's `completionBlock`.
///
/// The result handler can refer to the operation without preventing the
/// operation to deallocate after completion.
///
/// - parameter queue: The `DispatchQueue` which runs the result handler.
/// When the queue is nil, the completion block is executed right within
/// the operation's `completionBlock`.
/// - parameter resultHandler: A function that runs when the operation
/// is finished.
/// - parameter result: The result of the operation.
public func handleResult(
onQueue queue: DispatchQueue? = DispatchQueue.main,
with resultHandler: @escaping (_ result: Result<Output, Failure>) -> Void)
{
handleCompletion(onQueue: queue) { result in
if let result = result {
resultHandler(result)
}
}
}
public static func blockOperation(_ block: @escaping (AsynchronousOperation<Output, Failure>) -> Void) -> AsynchronousOperation<Output, Failure> {
return AsynchronousBlockOperation(block)
}
/// Don't override. Override main() instead.
override public func start() {
synchronized {
if _isEarlyFinished {
willChangeValue(forKey: isFinishedKey)
_isFinished = true
didChangeValue(forKey: isFinishedKey)
} else {
willChangeValue(forKey: isExecutingKey)
_isExecuting = true
didChangeValue(forKey: isExecutingKey)
main()
}
}
}
/// Called after the result has been set, or operation was cancelled.
///
/// Subclasses can override this method. Default implementation
/// does nothing.
///
/// - warning: Don't assume the `isFinished` flag is set, because
/// an asynchronous operation may get a result, or be cancelled, before it
/// is scheduled in an operation queue.
open func didComplete() { }
override open var isAsynchronous: Bool { return true }
override open var isExecuting: Bool { return _isExecuting }
override open var isFinished: Bool { return _isFinished }
override open var isCancelled: Bool { return _isCancelled }
private var _result: Result<Output, Failure>? = nil
private var lock = NSRecursiveLock()
private var _isCancelled: Bool = false
private var _isExecuting: Bool = false
private var _isFinished: Bool = false
/// Set to true whenever the operation is completed before the operation
/// queue has called the start() method, and the operation has become
/// "executing". We have to wait until start() is called before we trigger
/// the "isFinished" KVO notifications, or we get warnings in the console,
/// and even hard crashes.
private var _isEarlyFinished: Bool = false
private func synchronized<T>(_ execute: () throws -> T) rethrows -> T {
lock.lock()
defer { lock.unlock() }
return try execute()
}
}
private let isCancelledKey = "isCancelled"
private let isExecutingKey = "isExecuting"
private let isFinishedKey = "isFinished"
private class AsynchronousBlockOperation<Output, Failure: Error>: AsynchronousOperation<Output, Failure> {
let block: (AsynchronousOperation<Output, Failure>) -> Void
init(_ block: @escaping (AsynchronousOperation<Output, Failure>) -> Void) {
self.block = block
}
override func main() {
block(self)
}
}
extension OperationQueue {
public convenience init(
name: String? = nil,
qualityOfService: QualityOfService,
maxConcurrentOperationCount: Int = OperationQueue.defaultMaxConcurrentOperationCount)
{
self.init()
self.name = name
self.qualityOfService = qualityOfService
self.maxConcurrentOperationCount = maxConcurrentOperationCount
}
}
import XCTest
class AsynchronousOperationTests: XCTestCase {
// TODO: add tests for start() called without any OperationQueue.
// TODO: do we have to do anything regarding the isReady flag?
/// Waits for the operation to complete. This tests makes sure the operation
/// is well behaved and correctly notifies its completion to an
/// operation queue.
///
/// - parameter afterEnqueue: a function to execute after the operation
/// has been added to an operation queue, but before the operation has
/// started.
private func waitForOperationCompletion(_ operation: Operation, afterEnqueue: (() -> Void)? = nil) {
/// Makes sure the afterEnqueue function is called before the tested
/// operation starts
let beforeSemaphore = DispatchSemaphore(value: 0)
let beforeOperation = BlockOperation {
beforeSemaphore.wait()
}
operation.addDependency(beforeOperation)
/// Makes sure the operation completes "normally", according to
/// OperationQueue, by waiting for a dependent operation.
let expectation = self.expectation(description: "Operation Completion")
let finishOperation = BlockOperation {
expectation.fulfill()
}
finishOperation.addDependency(operation)
let queue = OperationQueue()
queue.addOperation(beforeOperation)
queue.addOperation(operation)
queue.addOperation(finishOperation)
afterEnqueue?()
beforeSemaphore.signal()
wait(for: [expectation], timeout: 1)
}
// MARK: - Finish synchronously from main()
//
// Here we test that AsynchronousOperation can behave like a synchronous
// operation, which finishes synchronously from its main() method.
func testAsynchronousOperationCanFinishImmediatelyWithSuccess() throws {
class TestOperation: AsynchronousOperation<Int, Error> {
override func main() {
result = .success(1)
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion Block")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
XCTFail("Expected result")
case let .success(value):
XCTAssertEqual(value, 1)
case .failure:
XCTFail("Unexpected error")
}
expectation.fulfill()
}
waitForOperationCompletion(operation)
waitForExpectations(timeout: 1, handler: nil)
}
func testAsynchronousOperationCanFinishImmediatelyWithError() throws {
struct TestError: Error { }
class TestOperation: AsynchronousOperation<Int, TestError> {
override func main() {
result = .failure(TestError())
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
XCTFail("Expected result")
case .success:
XCTFail("Unexpected value")
case .failure:
break
}
expectation.fulfill()
}
waitForOperationCompletion(operation)
waitForExpectations(timeout: 1, handler: nil)
}
func testAsynchronousOperationCanFinishImmediatelyWithCancellation() throws {
class TestOperation: AsynchronousOperation<Int, Error> {
override func main() {
cancel()
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
break
case .success:
XCTFail("Unexpected value")
case .failure:
XCTFail("Unexpected error")
}
expectation.fulfill()
}
waitForOperationCompletion(operation)
waitForExpectations(timeout: 1, handler: nil)
}
// MARK: - Finish asynchronously from main()
//
// Here we test that AsynchronousOperation can behave like an genuine
// asynchronous operation, which is not finished when its main() method
// exits.
func testAsynchronousOperationCanFinishAsynchronouslyWithSuccess() throws {
class TestOperation: AsynchronousOperation<Int, Error> {
override func main() {
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + .milliseconds(100)) {
self.result = .success(1)
}
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
XCTFail("Expected result")
case let .success(value):
XCTAssertEqual(value, 1)
case .failure:
XCTFail("Unexpected error")
}
expectation.fulfill()
}
waitForOperationCompletion(operation)
waitForExpectations(timeout: 1, handler: nil)
}
func testAsynchronousOperationCanFinishAsynchronouslyWithError() throws {
struct TestError: Error { }
class TestOperation: AsynchronousOperation<Int, TestError> {
override func main() {
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + .milliseconds(100)) {
self.result = .failure(TestError())
}
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
XCTFail("Expected result")
case .success:
XCTFail("Unexpected value")
case .failure:
break
}
expectation.fulfill()
}
waitForOperationCompletion(operation)
waitForExpectations(timeout: 1, handler: nil)
}
func testAsynchronousOperationCanFinishAsynchronouslyWithCancellation() throws {
class TestOperation: AsynchronousOperation<Int, Error> {
override func main() {
DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + .milliseconds(100)) {
self.cancel()
}
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
break
case .success:
XCTFail("Unexpected value")
case .failure:
XCTFail("Unexpected error")
}
expectation.fulfill()
}
waitForOperationCompletion(operation)
waitForExpectations(timeout: 1, handler: nil)
}
// MARK: - Finish after enqueue, but before start() is called
//
// Here we test that AsynchronousOperation can finish *after* it has been
// enqueued, but *before* the operation queue has started the operation.
//
// We want applications to freely finish operations by cancelling them or
// setting their results, but we know that OperationQueue is... picky.
// So it's important to test specific schedulings.
func testAsynchronousOperationCanFinishBeforeStartWithSuccess() throws {
class TestOperation: AsynchronousOperation<Int, Error> {
override func main() {
XCTFail("Should not run")
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
XCTFail("Expected result")
case let .success(value):
XCTAssertEqual(value, 1)
case .failure:
XCTFail("Unexpected error")
}
expectation.fulfill()
}
waitForOperationCompletion(operation, afterEnqueue: {
operation.result = .success(1)
})
waitForExpectations(timeout: 1, handler: nil)
}
func testAsynchronousOperationCanFinishBeforeStartWithError() throws {
struct TestError: Error { }
class TestOperation: AsynchronousOperation<Int, TestError> {
override func main() {
XCTFail("Should not run")
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
XCTFail("Expected result")
case .success:
XCTFail("Unexpected value")
case .failure:
break
}
expectation.fulfill()
}
waitForOperationCompletion(operation, afterEnqueue: {
operation.result = .failure(TestError())
})
waitForExpectations(timeout: 1, handler: nil)
}
func testAsynchronousOperationCanFinishBeforeStartWithCancellation() throws {
class TestOperation: AsynchronousOperation<Int, Error> {
override func main() {
XCTFail("Should not run")
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
break
case .success:
XCTFail("Unexpected value")
case .failure:
XCTFail("Unexpected error")
}
expectation.fulfill()
}
waitForOperationCompletion(operation, afterEnqueue: {
operation.cancel()
})
waitForExpectations(timeout: 1, handler: nil)
}
// MARK: - Finish before enqueue
//
// Here we test that AsynchronousOperation can finish *before* it has been
// enqueued.
//
// We want applications to freely finish operations by cancelling them or
// setting their results, but we know that OperationQueue is... picky.
// So it's important to test specific schedulings.
func testAsynchronousOperationCanFinishBeforeEnqueueWithSuccess() throws {
class TestOperation: AsynchronousOperation<Int, Error> {
override func main() {
XCTFail("Should not run")
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
XCTFail("Expected result")
case let .success(value):
XCTAssertEqual(value, 1)
case .failure:
XCTFail("Unexpected error")
}
expectation.fulfill()
}
operation.result = .success(1)
waitForOperationCompletion(operation)
waitForExpectations(timeout: 1, handler: nil)
}
func testAsynchronousOperationCanFinishBeforeEnqueueWithError() throws {
struct TestError: Error { }
class TestOperation: AsynchronousOperation<Int, TestError> {
override func main() {
XCTFail("Should not run")
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
XCTFail("Expected result")
case .success:
XCTFail("Unexpected value")
case .failure:
break
}
expectation.fulfill()
}
operation.result = .failure(TestError())
waitForOperationCompletion(operation)
waitForExpectations(timeout: 1, handler: nil)
}
func testAsynchronousOperationCanFinishBeforeEnqueueWithCancellation() throws {
class TestOperation: AsynchronousOperation<Int, Error> {
override func main() {
XCTFail("Should not run")
}
}
let operation = TestOperation()
let expectation = self.expectation(description: "Completion")
operation.handleCompletion { result in
XCTAssertTrue(Thread.isMainThread, "completion should be handled from the main thread")
switch result {
case nil:
break
case .success:
XCTFail("Unexpected value")
case .failure:
XCTFail("Unexpected error")
}
expectation.fulfill()
}
operation.cancel()
waitForOperationCompletion(operation)
waitForExpectations(timeout: 1, handler: nil)
}
}
import Foundation
import RxSwift
extension PrimitiveSequence where Trait == MaybeTrait {
/// Wraps the receiver Maybe in an Operation scheduled in the specified
/// operation queue.
///
/// The Operation in added to the queue when the returned Maybe
/// is subscribed.
///
/// The receiver Maybe is subscribed when the Operation starts.
///
/// The Operation completes when the receiver Maybe completes.
///
/// The returned Maybe completes on the specified dispatch queue,
/// which defaults to `DispatchQueue.main`. When the queue is nil, the
/// completion block is executed right within the
/// Operation's completionBlock.
func inOperationQueue(
_ operationQueue: OperationQueue,
observeOn queue: DispatchQueue? = DispatchQueue.main)
-> Maybe<Element>
{
Maybe.create { event -> Disposable in
let op = MaybeOperation(self)
op.handleResult(onQueue: queue) { result in
switch result {
case let .success(element):
if let element = element {
event(.success(element))
} else {
event(.completed)
}
case let .failure(error):
event(.error(error))
}
}
operationQueue.addOperation(op)
return Disposables.create { [weak op] in
op?.cancel()
}
}
}
}
extension PrimitiveSequence where Trait == SingleTrait {
/// Wraps the receiver Single in an Operation scheduled in the specified
/// operation queue.
///
/// The Operation in added to the queue when the returned Single
/// is subscribed.
///
/// The receiver Single is subscribed when the Operation starts.
///
/// The Operation completes when the receiver Single completes.
///
/// The returned Single completes on the specified dispatch queue,
/// which defaults to `DispatchQueue.main`. When the queue is nil, the
/// completion block is executed right within the
/// Operation's completionBlock.
func inOperationQueue(
_ operationQueue: OperationQueue,
observeOn queue: DispatchQueue? = DispatchQueue.main)
-> Single<Element>
{
Single.create { event -> Disposable in
let op = SingleOperation(self)
op.handleResult(onQueue: queue) { result in
switch result {
case let .success(element):
event(.success(element))
case let .failure(error):
event(.error(error))
}
}
operationQueue.addOperation(op)
return Disposables.create { [weak op] in
op?.cancel()
}
}
}
}
extension Completable {
/// Wraps the receiver Completable in an Operation scheduled in the
/// specified operation queue.
///
/// The Operation in added to the queue when the returned Completable
/// is subscribed.
///
/// The receiver Completable is subscribed when the Operation starts.
///
/// The Operation completes when the receiver Completable completes.
///
/// The returned Completable completes on the specified dispatch queue,
/// which defaults to `DispatchQueue.main`. When the queue is nil, the
/// completion block is executed right within the
/// Operation's completionBlock.
func inOperationQueue(
_ operationQueue: OperationQueue,
observeOn queue: DispatchQueue? = DispatchQueue.main)
-> Completable
{
Completable.create { event -> Disposable in
let op = CompletableOperation(self)
op.handleResult(onQueue: queue) { result in
switch result {
case .success:
event(.completed)
case let .failure(error):
event(.error(error))
}
}
operationQueue.addOperation(op)
return Disposables.create { [weak op] in
op?.cancel()
}
}
}
}
// MARK: - Support Operations
private class DisposableOperation<T>: AsynchronousOperation<T, Error> {
private var disposable: Disposable?
deinit {
disposable?.dispose()
}
func subscribe() -> Disposable {
fatalError("Abstract method")
}
override func main() {
disposable = subscribe()
}
override func cancel() {
disposable?.dispose()
disposable = nil
super.cancel()
}
}
/// Support for Maybe.inOperationQueue(_:)
private class MaybeOperation<Element>: DisposableOperation<Element?> {
let maybe: Maybe<Element>
init(_ maybe: Maybe<Element>) {
self.maybe = maybe
}
override func subscribe() -> Disposable {
maybe.subscribe(
onSuccess: { [weak self] in self?.result = .success($0) },
onError: { [weak self] in self?.result = .failure($0) },
onCompleted: { [weak self] in self?.result = .success(nil) })
}
}
/// Support for Single.inOperationQueue(_:)
private class SingleOperation<Element>: DisposableOperation<Element> {
let single: Single<Element>
init(_ single: Single<Element>) {
self.single = single
}
override func subscribe() -> Disposable {
single.subscribe(
onSuccess: { [weak self] in self?.result = .success($0) },
onError: { [weak self] in self?.result = .failure($0) })
}
}
/// Support for Completable.inOperationQueue(_:)
private class CompletableOperation: DisposableOperation<Void> {
let completable: Completable
init(_ completable: Completable) {
self.completable = completable
}
override func subscribe() -> Disposable {
completable.subscribe(
onCompleted: { [weak self] in self?.result = .success(()) },
onError: { [weak self] in self?.result = .failure($0) })
}
}
import Combine
import Foundation
extension SinglePublisher {
/// Returns an asynchronous operation that wraps the upstream publisher.
///
/// The uptream publisher is subscribed when the operation starts. The
/// operation completes when the uptream publisher completes.
///
/// Use `subscribe(on:options:)` when you need to control when the upstream
/// publisher is subscribed:
///
/// let operation = upstreamPublisher
/// .subscribe(on: DispatchQueue.main)
/// .operation()
func operation() -> SinglePublisherOperation<Self> {
SinglePublisherOperation(self)
}
/// Returns a publisher which, on subscription, wraps the upstream publisher
/// in an operation, and adds the operation to the provided operation queue.
///
/// The uptream publisher is subscribed when the operation starts. The
/// operation completes when the uptream publisher completes. The returned
/// publisher completes with the operation.
///
/// Use `subscribe(on:options:)` when you need to control when the upstream
/// publisher is subscribed:
///
/// let publisher = upstreamPublisher
/// .subscribe(on: DispatchQueue.main)
/// .inOperationQueue(queue)
///
/// Use `receive(on:options:)` when you need to control when the returned
/// publisher publishes its elements and completion:
///
/// let publisher = upstreamPublisher
/// .inOperationQueue(queue)
/// .receive(on: DispatchQueue.main)
///
/// - parameter operationQueue: The `OperationQueue` to run the publisher in.
func inOperationQueue(_ operationQueue: OperationQueue)
-> AsynchronousOperationPublisher<Self>
{
AsynchronousOperationPublisher(
upstream: self,
operationQueue: operationQueue)
}
}
// MARK: - AsynchronousOperationPublisher
/// A publisher that runs an asynchronous operation.
struct AsynchronousOperationPublisher<Upstream: SinglePublisher>: SinglePublisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
private struct Context {
let upstream: Upstream
let queue: OperationQueue
}
// swiftlint:disable:next colon
private class Subscription<Downstream: Subscriber>:
TraitSubscriptions.Single<Downstream, Context>
where
Downstream.Input == Output,
Downstream.Failure == Failure
{
private weak var operation: SinglePublisherOperation<Upstream>?
override func start(with context: Context) {
let operation = context.upstream.operation()
operation.handleCompletion(onQueue: nil) { [weak self] result in
guard let self = self else { return }
switch result {
case nil:
self.cancel()
case let .success(value):
self.receive(.success(value))
case let .failure(error):
self.receive(.failure(error))
}
}
self.operation = operation
context.queue.addOperation(operation)
}
override func didCancel(with context: Context) {
operation?.cancel()
}
}
private let context: Context
fileprivate init(
upstream: Upstream,
operationQueue: OperationQueue)
{
context = Context(upstream: upstream, queue: operationQueue)
}
func receive<S>(subscriber: S)
where S: Subscriber, S.Failure == Self.Failure, S.Input == Self.Output
{
let subscription = Subscription(downstream: subscriber, context: context)
subscriber.receive(subscription: subscription)
}
}
// MARK: - SinglePublisherOperation
/// An operation that runs a publisher.
class SinglePublisherOperation<Upstream: SinglePublisher>: AsynchronousOperation<Upstream.Output, Upstream.Failure> {
private var upstream: Upstream?
private var cancellable: AnyCancellable?
fileprivate init(_ upstream: Upstream) {
self.upstream = upstream
}
override func main() {
guard let upstream = upstream else {
// It can only get nil if operation was cancelled. Who would
// call main() on a cancelled operation? Nobody.
preconditionFailure("SingleOperation started without upstream publisher")
}
cancellable = upstream.sinkSingle { [weak self] result in
self?.result = result
}
// Release memory
self.upstream = nil
}
override func cancel() {
super.cancel()
upstream = nil
cancellable = nil
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment