Last active
March 10, 2021 15:54
-
-
Save KaQuMiQ/65fab5909b38c8b2e0c24be4ca70006a to your computer and use it in GitHub Desktop.
Futura3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import struct Foundation.TimeInterval | |
| import struct Foundation.Date | |
| import class Atomics.ManagedAtomic | |
| /// Cancelation token that can be used to cancel associated tasks. | |
| public struct Cancelation { | |
| private var status: () -> Bool | |
| private var cancelation: () -> Void | |
| } | |
| public extension Cancelation { | |
| /// Cancelation status. | |
| var isCanceled: Bool { status() } | |
| /// Cancel tasks and change status to canceled. | |
| func cancel() { cancelation() } | |
| } | |
| public extension Cancelation { | |
| /// Token which can become cancelled only when explicitly called. | |
| static var manual: Self { | |
| let state = ManagedAtomic<Bool>(false) | |
| return Self( | |
| status: { state.load(ordering: .sequentiallyConsistent) }, | |
| cancelation: { state.store(true, ordering: .sequentiallyConsistent) } | |
| ) | |
| } | |
| /// Token which can become cancelled when explicitly called. | |
| /// Takes closure executed on manual cancelation. | |
| /// - warning: You have to ensure that closure execution is thread safe. | |
| static func withClosure( | |
| _ closure: @escaping () -> Void | |
| ) -> Self { | |
| let state = ManagedAtomic<Bool>(false) | |
| var closure: Optional<() -> Void> = closure | |
| return Self( | |
| status: { state.load(ordering: .sequentiallyConsistent) }, | |
| cancelation: { | |
| guard !state.exchange(true, ordering: .sequentiallyConsistent) else { return } | |
| closure?() | |
| closure = nil | |
| }) | |
| } | |
| /// Token which can become cancelled when explicitly called | |
| /// or automatically after timeout time. | |
| static func withTimeout(_ timeInterval: TimeInterval) -> Self { | |
| precondition(timeInterval > 0, "Cannot make timeout now or in past") | |
| return withDeadline(Date(timeIntervalSinceNow: timeInterval)) | |
| } | |
| /// Token which can become cancelled when explicitly called | |
| /// or automatically after passing deadline. | |
| static func withDeadline( | |
| _ date: Date, | |
| now: @escaping () -> Date = Date.init | |
| ) -> Self { | |
| assert(date.timeIntervalSince(now()) > 0, "Cannot make deadline now or in past") | |
| let state = ManagedAtomic<Bool>(false) | |
| return Self( | |
| status: { state.load(ordering: .sequentiallyConsistent) || date.timeIntervalSince(now()) <= 0 }, | |
| cancelation: { state.store(true, ordering: .sequentiallyConsistent) } | |
| ) | |
| } | |
| /// Token which combines given tokens. | |
| /// On manual cancel cancels all tokens. | |
| /// Is canceled only when all tokens are cancelled. | |
| static func combined(_ tokens: Cancelation...) -> Self { | |
| combined(tokens) | |
| } | |
| /// Token which combines given tokens. | |
| /// On manual cancel cancels all tokens. | |
| /// Is canceled only when all tokens are cancelled. | |
| static func combined(_ tokens: Array<Cancelation>) -> Self { | |
| assert(tokens.count > 1, "Cannot combine single or no token") | |
| return Self( | |
| status: { | |
| for token in tokens { | |
| if token.isCanceled { return true } | |
| } | |
| return false | |
| }, | |
| cancelation: { tokens.forEach { $0.cancel() } } | |
| ) | |
| } | |
| /// Token which will never become cancelled. | |
| /// - warning: If you combine `never` cancelation token using `combined` function | |
| /// it will prevent returned (combined) token from being ever canceled. | |
| static var never: Self { | |
| Self( | |
| status: { false }, | |
| cancelation: {} | |
| ) | |
| } | |
| static var canceled: Self { | |
| Self( | |
| status: { true }, | |
| cancelation: {} | |
| ) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import class Dispatch.DispatchQueue | |
| import struct Dispatch.DispatchTime | |
| import enum Dispatch.DispatchTimeInterval | |
| import struct Foundation.TimeInterval | |
| public struct Scheduler { | |
| public var scheduler: (_ delay: TimeInterval, _ task: @escaping () -> Void) -> Void | |
| public init(_ scheduler: @escaping (_ delay: TimeInterval, _ task: @escaping () -> Void) -> Void) { | |
| self.scheduler = scheduler | |
| } | |
| } | |
| public extension Scheduler { | |
| func schedule(after delay: TimeInterval = 0, _ task: @escaping () -> Void) { | |
| scheduler(delay, task) | |
| } | |
| } | |
| public extension Scheduler { | |
| static func dispatchQueue(_ queue: DispatchQueue) -> Self { | |
| Self { delay, task in | |
| if delay.isZero { | |
| queue.async { task() } | |
| } else { | |
| let deadline = DispatchTime.now().advanced(by: .milliseconds(Int(delay * 1000))) | |
| queue.asyncAfter(deadline: deadline) { task() } | |
| } | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public struct Signal<Value, Failure: Error> { | |
| internal var source: ( | |
| _ demand: SignalDemand, | |
| _ cancelation: Cancelation, | |
| _ output: @escaping (Value) -> Void, | |
| _ completion: @escaping (SignalDemandResult<Failure>) -> Void | |
| ) -> Void | |
| internal init( | |
| source: @escaping ( | |
| _ demand: SignalDemand, | |
| _ cancelation: Cancelation, | |
| _ output: @escaping (Value) -> Void, | |
| _ completion: @escaping (SignalDemandResult<Failure>) -> Void | |
| ) -> Void | |
| ) { | |
| self.source = source | |
| } | |
| } | |
| public extension Signal { | |
| @discardableResult | |
| func request( | |
| _ demand: SignalDemand = .unlimited, | |
| cancelation: Cancelation = .manual, | |
| output: @escaping (Value) -> Void, | |
| completion: @escaping (SignalDemandResult<Failure>) -> Void | |
| ) -> Cancelation { | |
| source(demand, cancelation, output, completion) | |
| return cancelation | |
| } | |
| } | |
| public extension Signal where Value == Void { | |
| @discardableResult | |
| func request( | |
| _ demand: SignalDemand = .unlimited, | |
| cancelation: Cancelation = .manual, | |
| output: @escaping () -> Void, | |
| completion: @escaping (SignalDemandResult<Failure>) -> Void | |
| ) -> Cancelation { | |
| self.request( | |
| demand, | |
| cancelation: cancelation, | |
| output: { _ in output() }, | |
| completion: completion | |
| ) | |
| } | |
| } | |
| public extension Signal where Value == Never { | |
| @discardableResult | |
| func request( | |
| _ demand: SignalDemand = .unlimited, | |
| cancelation: Cancelation = .manual, | |
| completion: @escaping (SignalDemandResult<Failure>) -> Void | |
| ) -> Cancelation { | |
| self.request( | |
| demand, | |
| cancelation: cancelation, | |
| output: { _ in }, | |
| completion: completion | |
| ) | |
| } | |
| } | |
| import class Foundation.NSRecursiveLock | |
| import class Atomics.ManagedAtomic | |
| public extension Signal { | |
| static var finished: Self { | |
| Self { _, cancelation, _, completion in | |
| if cancelation.isCanceled { | |
| completion(.canceled) | |
| } else { | |
| completion(.finished) | |
| } | |
| } | |
| } | |
| static func failed(error: Failure) -> Self { | |
| Self { _, cancelation, _, completion in | |
| if cancelation.isCanceled { | |
| completion(.canceled) | |
| } else { | |
| completion(.failed(error)) | |
| } | |
| } | |
| } | |
| static func just(_ value: Value) -> Self { | |
| Self { demand, cancelation, output, completion in | |
| guard !cancelation.isCanceled else { return completion(.canceled) } | |
| switch demand { | |
| case .upTo(0): | |
| completion(.finished) | |
| case .unlimited, .upTo: | |
| output(value) | |
| completion(.finished) | |
| } | |
| } | |
| } | |
| static func sequence<S>( | |
| _ values: S | |
| ) -> Self | |
| where S: Sequence, S.Element == Value { | |
| return Self { demand, cancelation, output, completion in | |
| switch demand { | |
| case .unlimited: | |
| var iterator = values.makeIterator() | |
| while let next = iterator.next() { | |
| guard !cancelation.isCanceled else { return completion(.canceled) } | |
| output(next) | |
| } | |
| case var .upTo(remaining): | |
| var iterator = values.makeIterator() | |
| while let next = iterator.next(), remaining > 0 { | |
| guard !cancelation.isCanceled else { return completion(.canceled) } | |
| output(next) | |
| remaining -= 1 | |
| } | |
| } | |
| completion(.finished) | |
| } | |
| } | |
| static func lazy( | |
| _ load: @escaping () -> Value | |
| ) -> Self { | |
| let cacheLock = NSRecursiveLock() // we might also try out RWLock or plain mutex here | |
| var cachedValue = Optional<Value>.none | |
| return Self { demand, cancelation, output, completion in | |
| guard !cancelation.isCanceled else { return completion(.canceled) } | |
| switch demand { | |
| case .upTo(0): | |
| completion(.finished) | |
| case .unlimited, .upTo: | |
| cacheLock.lock() | |
| let value: Value | |
| if let cachedValue = cachedValue { | |
| value = cachedValue | |
| } else { | |
| value = load() | |
| cachedValue = value | |
| } | |
| cacheLock.unlock() | |
| guard !cancelation.isCanceled else { return completion(.canceled) } | |
| output(value) | |
| completion(.finished) | |
| } | |
| } | |
| } | |
| static func deferred( | |
| _ deferred: @escaping (@escaping (Value) -> Void) -> Void | |
| ) -> Self { | |
| Self { demand, cancelation, output, completion in | |
| let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here | |
| func completeOnce(_ completionResult: SignalDemandResult<Failure>) { | |
| if completed.exchange(true, ordering: .sequentiallyConsistent) { | |
| return | |
| } else { | |
| return completion(completionResult) | |
| } | |
| } | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| switch demand { | |
| case .upTo(0): | |
| completeOnce(.finished) | |
| case .unlimited, .upTo: | |
| deferred { value in | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard !completed.exchange(true, ordering: .sequentiallyConsistent) else { return } | |
| output(value) | |
| completion(.finished) | |
| } | |
| } | |
| } | |
| } | |
| static func future( | |
| _ future: @escaping (@escaping (Result<Value, Failure>) -> Void) -> Void | |
| ) -> Self { | |
| Self { demand, cancelation, output, completion in | |
| let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here | |
| func completeOnce(_ completionResult: SignalDemandResult<Failure>) { | |
| if completed.exchange(true, ordering: .sequentiallyConsistent) { | |
| return | |
| } else { | |
| return completion(completionResult) | |
| } | |
| } | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| switch demand { | |
| case .upTo(0): | |
| completeOnce(.finished) | |
| case .unlimited, .upTo: | |
| future { result in | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| switch result { | |
| case let .success(value): | |
| guard !completed.exchange(true, ordering: .sequentiallyConsistent) else { return } | |
| output(value) | |
| completion(.finished) | |
| case let .failure(error): | |
| completeOnce(.failed(error)) | |
| } | |
| } | |
| } | |
| } | |
| } | |
| static func stream( | |
| _ stream: @escaping ( | |
| _ cancelation: Cancelation, | |
| _ output: @escaping (Value) -> Void, | |
| _ close: @escaping (Optional<Failure>) -> Void | |
| ) -> Void | |
| ) -> Self { | |
| Self { demand, cancelation, output, completion in | |
| let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here | |
| func completeOnce(_ completionResult: SignalDemandResult<Failure>) { | |
| if completed.exchange(true, ordering: .sequentiallyConsistent) { | |
| return | |
| } else { | |
| return completion(completionResult) | |
| } | |
| } | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| switch demand { | |
| case let .upTo(count): | |
| let reminingCount = ManagedAtomic<UInt>(count) | |
| stream( | |
| cancelation, | |
| { value in | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard reminingCount.loadThenWrappingDecrement(ordering: .sequentiallyConsistent) > 0 | |
| else { return completeOnce(.finished) } | |
| output(value) | |
| }, | |
| { closing in | |
| if let error = closing { | |
| completeOnce(.failed(error)) | |
| } else { | |
| completeOnce(.finished) | |
| } | |
| } | |
| ) | |
| case .unlimited: | |
| stream( | |
| cancelation, | |
| { value in | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| output(value) | |
| }, | |
| { closing in | |
| if let error = closing { | |
| completeOnce(.failed(error)) | |
| } else { | |
| completeOnce(.finished) | |
| } | |
| } | |
| ) | |
| } | |
| } | |
| } | |
| } | |
| public extension Signal { | |
| func map<NewValue>( | |
| _ transform: @escaping (Value) -> NewValue | |
| ) -> Signal<NewValue, Failure> { | |
| Signal<NewValue, Failure> { demand, cancelation, output, completion in | |
| self.source( | |
| demand, | |
| cancelation, | |
| { value in output(transform(value)) }, | |
| completion | |
| ) | |
| } | |
| } | |
| func flatMap<NewValue>( | |
| _ transform: @escaping (Value) -> Signal<NewValue, Failure> | |
| ) -> Signal<NewValue, Failure> { | |
| Signal<NewValue, Failure> { demand, cancelation, output, completion in | |
| let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here | |
| func completeOnce(_ completionResult: SignalDemandResult<Failure>) { | |
| if completed.exchange(true, ordering: .sequentiallyConsistent) { | |
| return | |
| } else { | |
| return completion(completionResult) | |
| } | |
| } | |
| let finishCounter = ManagedAtomic<Int>(1) | |
| func beginNested() { | |
| finishCounter.wrappingIncrement(ordering: .sequentiallyConsistent) | |
| } | |
| func finishNested() { | |
| finishCounter.wrappingDecrement(ordering: .sequentiallyConsistent) | |
| } | |
| func finishIfNeeded() { | |
| guard finishCounter.wrappingDecrementThenLoad(ordering: .sequentiallyConsistent) <= 0 else { return } | |
| completeOnce(.finished) | |
| } | |
| switch demand { | |
| case let .upTo(count): | |
| let reminingCount = ManagedAtomic<UInt>(count) | |
| self.source( | |
| .unlimited, | |
| cancelation, | |
| { value in | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard !completed.load(ordering: .sequentiallyConsistent) else { return } | |
| beginNested() | |
| transform(value) | |
| .source( | |
| demand, | |
| cancelation, | |
| { value in | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard reminingCount.loadThenWrappingDecrement(ordering: .sequentiallyConsistent) > 0 | |
| else { return completeOnce(.finished) } | |
| guard !completed.load(ordering: .sequentiallyConsistent) else { return } | |
| output(value) | |
| }, | |
| { result in | |
| switch result { | |
| case .finished: | |
| finishIfNeeded() // we allow outer signal to be alive if there are no more values from inner signal | |
| case .canceled: | |
| finishNested() // we allow outer signal to be alive if inner signal was canceled | |
| case let .failed(error): | |
| completeOnce(.failed(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes | |
| } | |
| } | |
| ) | |
| }, | |
| { result in | |
| switch result { | |
| case .finished: | |
| finishIfNeeded() // we allow inner signal to be alive if there are no more values from outer signal | |
| case .canceled: | |
| completeOnce(.canceled) | |
| case let .failed(error): | |
| completeOnce(.failed(error)) | |
| } | |
| } | |
| ) | |
| case .unlimited: | |
| self.source( | |
| .unlimited, | |
| cancelation, | |
| { value in | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard !completed.load(ordering: .sequentiallyConsistent) else { return } | |
| beginNested() | |
| transform(value) | |
| .source( | |
| .unlimited, | |
| cancelation, | |
| { value in | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard !completed.load(ordering: .sequentiallyConsistent) else { return } | |
| output(value) | |
| }, | |
| { result in | |
| switch result { | |
| case .finished: | |
| finishIfNeeded() // we allow outer signal to be alive if there are no more values from inner signal | |
| case .canceled: | |
| finishNested() // we allow outer signal to be alive if inner signal was canceled | |
| case let .failed(error): | |
| completeOnce(.failed(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes | |
| } | |
| } | |
| ) | |
| }, | |
| { result in | |
| switch result { | |
| case .finished: | |
| finishIfNeeded() // we allow inner signal to be alive if there are no more values from outer signal | |
| case .canceled: | |
| completeOnce(.canceled) | |
| case let .failed(error): | |
| completeOnce(.failed(error)) | |
| } | |
| } | |
| ) | |
| } | |
| } | |
| } | |
| func flatMapLatest<NewValue>( | |
| _ transform: @escaping (Value) -> Signal<NewValue, Failure> | |
| ) -> Signal<NewValue, Failure> { | |
| Signal<NewValue, Failure> { demand, cancelation, output, completion in | |
| let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here | |
| func completeOnce(_ completionResult: SignalDemandResult<Failure>) { | |
| if completed.exchange(true, ordering: .sequentiallyConsistent) { | |
| return | |
| } else { | |
| return completion(completionResult) | |
| } | |
| } | |
| let finishCounter = ManagedAtomic<Int>(1) | |
| func beginNested() { | |
| finishCounter.wrappingIncrement(ordering: .sequentiallyConsistent) | |
| } | |
| func finishNested() { | |
| finishCounter.wrappingDecrement(ordering: .sequentiallyConsistent) | |
| } | |
| func finishIfNeeded() { | |
| guard finishCounter.wrappingDecrementThenLoad(ordering: .sequentiallyConsistent) <= 0 else { return } | |
| completeOnce(.finished) | |
| } | |
| let latestCancelation = ManagedAtomic<Box<Cancelation>>(Box(.canceled)) | |
| switch demand { | |
| case let .upTo(count): | |
| let reminingCount = ManagedAtomic<UInt>(count) | |
| self.source( | |
| .unlimited, | |
| cancelation, | |
| { value in | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard !completed.load(ordering: .sequentiallyConsistent) else { return } | |
| let localCancelation = Cancelation.manual | |
| latestCancelation | |
| .exchange( | |
| Box(localCancelation), | |
| ordering: .sequentiallyConsistent | |
| ) | |
| .value | |
| .cancel() | |
| let combinedCancelation = Cancelation.combined(cancelation, localCancelation) | |
| beginNested() | |
| transform(value) | |
| .source( | |
| demand, | |
| combinedCancelation, | |
| { value in | |
| guard !combinedCancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard reminingCount.loadThenWrappingDecrement(ordering: .sequentiallyConsistent) > 0 | |
| else { return completeOnce(.finished) } | |
| guard !completed.load(ordering: .sequentiallyConsistent) else { return } | |
| output(value) | |
| }, | |
| { result in | |
| switch result { | |
| case .finished: | |
| finishIfNeeded() // we allow outer signal to be alive if there are no more values from inner signal | |
| case .canceled: | |
| finishNested() // we allow outer signal to be alive if inner signal was canceled | |
| case let .failed(error): | |
| completeOnce(.failed(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes | |
| } | |
| } | |
| ) | |
| }, | |
| { result in | |
| switch result { | |
| case .finished: | |
| finishIfNeeded() // we allow inner signal to be alive if there are no more values from outer signal | |
| case .canceled: | |
| completeOnce(.canceled) | |
| case let .failed(error): | |
| completeOnce(.failed(error)) | |
| } | |
| } | |
| ) | |
| case .unlimited: | |
| self.source( | |
| .unlimited, | |
| cancelation, | |
| { value in | |
| guard !cancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard !completed.load(ordering: .sequentiallyConsistent) else { return } | |
| let localCancelation = Cancelation.manual | |
| latestCancelation | |
| .exchange( | |
| Box(localCancelation), | |
| ordering: .sequentiallyConsistent | |
| ) | |
| .value | |
| .cancel() | |
| let combinedCancelation = Cancelation.combined(cancelation, localCancelation) | |
| beginNested() | |
| transform(value) | |
| .source( | |
| .unlimited, | |
| combinedCancelation, | |
| { value in | |
| guard !combinedCancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard !completed.load(ordering: .sequentiallyConsistent) else { return } | |
| output(value) | |
| }, | |
| { result in | |
| switch result { | |
| case .finished: | |
| finishIfNeeded() // we allow outer signal to be alive if there are no more values from inner signal | |
| case .canceled: | |
| finishNested() // we allow outer signal to be alive if inner signal was canceled | |
| case let .failed(error): | |
| completeOnce(.failed(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes | |
| } | |
| } | |
| ) | |
| }, | |
| { result in | |
| switch result { | |
| case .finished: | |
| finishIfNeeded() // we allow inner signal to be alive if there are no more values from outer signal | |
| case .canceled: | |
| completeOnce(.canceled) | |
| case let .failed(error): | |
| completeOnce(.failed(error)) | |
| } | |
| } | |
| ) | |
| } | |
| } | |
| } | |
| func `switch`( | |
| to scheduler: Scheduler | |
| ) -> Self { | |
| Self { demand, cancelation, output, completion in | |
| self.source( | |
| demand, | |
| cancelation, | |
| { value in | |
| scheduler | |
| .schedule { | |
| // we wont deliver scheduled values after cancelation | |
| guard !cancelation.isCanceled else { return } | |
| output(value) | |
| } | |
| }, | |
| { completionResult in | |
| scheduler.schedule { | |
| // we wont deliver scheduled completion after cancelation | |
| if cancelation.isCanceled { | |
| completion(.canceled) | |
| } else { | |
| completion(completionResult) | |
| } | |
| } | |
| } | |
| ) | |
| } | |
| } | |
| } | |
| public extension Signal { | |
| func filter( _ filter: @escaping (Value) -> Bool) -> Self { | |
| Self { demand, cancelation, output, completion in | |
| self.source( | |
| demand, | |
| cancelation, | |
| { value in | |
| guard filter(value) else { return } | |
| output(value) | |
| }, | |
| completion | |
| ) | |
| } | |
| } | |
| } | |
| public extension Signal { | |
| func shared(bufferSize: UInt) -> Self { | |
| fatalError("TODO: to complete - key component") | |
| // #warning("TODO: locking") | |
| // var buffer = Array<Value>() | |
| // buffer.reserveCapacity(Int(bufferSize)) | |
| // var completionResult: SignalDemandResult<Failure>? | |
| // self.source( | |
| // .unlimited, | |
| // .never, | |
| // { value in | |
| // if buffer.count >= bufferSize { | |
| // buffer.removeFirst() | |
| // } else { /* */ } | |
| // buffer.append(value) | |
| // }, | |
| // { result in | |
| // completionResult = result | |
| // } | |
| // ) | |
| // return Self { demand, cancelation, output, completion in | |
| // guard !cancelation.isCanceled else { return completion(.canceled) } | |
| // switch demand { | |
| // case let .upTo(count): | |
| // case .unlimited: | |
| // } | |
| // } | |
| } | |
| } | |
| public func merge<Value, Failure: Error>( | |
| _ signals: Signal<Value, Failure>... | |
| ) -> Signal<Value, Failure> { | |
| return Signal<Value, Failure> { demand, cancelation, output, completion in | |
| guard !cancelation.isCanceled else { return completion(.canceled) } | |
| let localCancelation = Cancelation.manual | |
| let combinedCancelation = Cancelation.combined(cancelation, localCancelation) | |
| let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here | |
| func completeOnce(_ completionResult: SignalDemandResult<Failure>) { | |
| defer { localCancelation.cancel() } | |
| if completed.exchange(true, ordering: .sequentiallyConsistent) { | |
| return | |
| } else { | |
| return completion(completionResult) | |
| } | |
| } | |
| switch demand { | |
| case let .upTo(count): | |
| let reminingCount = ManagedAtomic<UInt>(count) | |
| signals.forEach { signal in | |
| signal | |
| .source( | |
| demand, | |
| combinedCancelation, | |
| { value in | |
| guard !combinedCancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard reminingCount.loadThenWrappingDecrement(ordering: .sequentiallyConsistent) > 0 | |
| else { return completeOnce(.finished) } | |
| guard !completed.load(ordering: .sequentiallyConsistent) else { return } | |
| output(value) | |
| }, | |
| { completionResult in | |
| completeOnce(completionResult) | |
| } | |
| ) | |
| } | |
| case .unlimited: | |
| signals.forEach { signal in | |
| signal | |
| .source( | |
| .unlimited, | |
| combinedCancelation, | |
| { value in | |
| guard !combinedCancelation.isCanceled else { return completeOnce(.canceled) } | |
| guard !completed.load(ordering: .sequentiallyConsistent) else { return } | |
| output(value) | |
| }, | |
| { completionResult in | |
| completeOnce(completionResult) | |
| } | |
| ) | |
| } | |
| } | |
| } | |
| } | |
| import protocol Atomics.AtomicReference | |
| private final class Box<Value>: AtomicReference { | |
| fileprivate let value: Value | |
| fileprivate init(_ value: Value) { | |
| self.value = value | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public enum SignalDemand: Hashable, Comparable { | |
| case upTo(UInt) | |
| case unlimited | |
| public static func < (lhs: SignalDemand, rhs: SignalDemand) -> Bool { | |
| switch (lhs, rhs) { | |
| case (.unlimited, .unlimited): | |
| return false | |
| case (.unlimited, _): | |
| return false | |
| case (_, .unlimited): | |
| return true | |
| case let (.upTo(lVal), .upTo(rVal)): | |
| return lVal < rVal | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public enum SignalDemandResult<Failure: Error> { | |
| case finished | |
| case failed(Failure) | |
| case canceled | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment