Created
November 5, 2019 19:56
-
-
Save dchohfi/92e5babb47f5f136a8986fca17acc47b to your computer and use it in GitHub Desktop.
Swift inout scan
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extension Publisher { | |
public func inoutScan<Result>( | |
_ initialResult: Result, | |
_ nextPartialResult: @escaping (inout Result, Output) -> Void | |
) -> Publishers.InoutScan<Self, Result> { | |
return .init(upstream: self, | |
initialResult: initialResult, | |
nextPartialResult: nextPartialResult) | |
} | |
public func tryInoutScan<Result>( | |
_ initialResult: Result, | |
_ nextPartialResult: @escaping (inout Result, Output) throws -> Void | |
) -> Publishers.TryInoutScan<Self, Result> { | |
return .init(upstream: self, | |
initialResult: initialResult, | |
nextPartialResult: nextPartialResult) | |
} | |
} | |
extension Publishers { | |
public struct InoutScan<Upstream: Publisher, Output>: Publisher { | |
public typealias Failure = Upstream.Failure | |
public let upstream: Upstream | |
public let initialResult: Output | |
public let nextPartialResult: (inout Output, Upstream.Output) -> Void | |
public init(upstream: Upstream, | |
initialResult: Output, | |
nextPartialResult: @escaping (inout Output, Upstream.Output) -> Void) { | |
self.upstream = upstream | |
self.initialResult = initialResult | |
self.nextPartialResult = nextPartialResult | |
} | |
public func receive<Downstream: Subscriber>(subscriber: Downstream) | |
where Output == Downstream.Input, Upstream.Failure == Downstream.Failure | |
{ | |
upstream.subscribe(Inner(downstream: subscriber, | |
initialResult: initialResult, | |
nextPartialResult: nextPartialResult)) | |
} | |
} | |
public struct TryInoutScan<Upstream: Publisher, Output>: Publisher { | |
public typealias Failure = Error | |
public let upstream: Upstream | |
public let initialResult: Output | |
public let nextPartialResult: (inout Output, Upstream.Output) throws -> Void | |
public init( | |
upstream: Upstream, | |
initialResult: Output, | |
nextPartialResult: @escaping (inout Output, Upstream.Output) throws -> Void | |
) { | |
self.upstream = upstream | |
self.initialResult = initialResult | |
self.nextPartialResult = nextPartialResult | |
} | |
public func receive<Downstream: Subscriber>(subscriber: Downstream) | |
where Output == Downstream.Input, Downstream.Failure == Error | |
{ | |
upstream.subscribe(Inner(downstream: subscriber, | |
initialResult: initialResult, | |
nextPartialResult: nextPartialResult)) | |
} | |
} | |
} | |
extension Publishers.InoutScan { | |
private final class Inner<Downstream: Subscriber> | |
: Subscriber, | |
CustomStringConvertible, | |
CustomReflectable, | |
CustomPlaygroundDisplayConvertible | |
where Upstream.Failure == Downstream.Failure | |
{ | |
typealias Input = Upstream.Output | |
typealias Failure = Upstream.Failure | |
private let downstream: Downstream | |
private let nextPartialResult: (inout Downstream.Input, Input) -> Void | |
private var result: Downstream.Input | |
fileprivate init( | |
downstream: Downstream, | |
initialResult: Downstream.Input, | |
nextPartialResult: @escaping (inout Downstream.Input, Input) -> Void | |
) | |
{ | |
self.downstream = downstream | |
self.result = initialResult | |
self.nextPartialResult = nextPartialResult | |
} | |
func receive(subscription: Subscription) { | |
downstream.receive(subscription: subscription) | |
} | |
func receive(_ input: Input) -> Subscribers.Demand { | |
nextPartialResult(&result, input) | |
return downstream.receive(result) | |
} | |
func receive(completion: Subscribers.Completion<Failure>) { | |
downstream.receive(completion: completion) | |
} | |
var description: String { return "Scan" } | |
var customMirror: Mirror { | |
let children: [Mirror.Child] = [ | |
("downstream", downstream), | |
("result", result) | |
] | |
return Mirror(self, children: children) | |
} | |
var playgroundDescription: Any { return description } | |
} | |
} | |
extension Publishers.TryInoutScan { | |
private final class Inner<Downstream: Subscriber> | |
: Subscriber, | |
Subscription, | |
CustomStringConvertible, | |
CustomReflectable, | |
CustomPlaygroundDisplayConvertible | |
where Downstream.Failure == Error | |
{ | |
typealias Input = Upstream.Output | |
typealias Failure = Upstream.Failure | |
private let downstream: Downstream | |
private let nextPartialResult: (inout Downstream.Input, Input) throws -> Void | |
private var result: Downstream.Input | |
private var status = SubscriptionStatus.awaitingSubscription | |
private var lock: NSLock? = NSLock() | |
private var finished = false | |
fileprivate init( | |
downstream: Downstream, | |
initialResult: Downstream.Input, | |
nextPartialResult: | |
@escaping (inout Downstream.Input, Input) throws -> Void | |
) { | |
self.downstream = downstream | |
self.nextPartialResult = nextPartialResult | |
self.result = initialResult | |
} | |
deinit { | |
lock = nil | |
} | |
func receive(subscription: Subscription) { | |
lock?.lock() | |
guard case .awaitingSubscription = status else { | |
lock?.unlock() | |
subscription.cancel() | |
return | |
} | |
status = .subscribed(subscription) | |
lock?.unlock() | |
downstream.receive(subscription: self) | |
} | |
func receive(_ input: Input) -> Subscribers.Demand { | |
do { | |
try nextPartialResult(&result, input) | |
return downstream.receive(result) | |
} catch { | |
lock?.lock() | |
guard case let .subscribed(subscription) = status else { | |
lock?.unlock() | |
return .none | |
} | |
status = .terminal | |
lock?.unlock() | |
subscription.cancel() | |
downstream.receive(completion: .failure(error)) | |
return .none | |
} | |
} | |
func receive(completion: Subscribers.Completion<Upstream.Failure>) { | |
// Combine doesn't use locking in this method! | |
guard case .subscribed = status else { | |
return | |
} | |
downstream.receive(completion: .finished) | |
} | |
func request(_ demand: Subscribers.Demand) { | |
lock?.lock() | |
guard case let .subscribed(subscription) = status else { | |
lock?.unlock() | |
return | |
} | |
lock?.unlock() | |
subscription.request(demand) | |
} | |
func cancel() { | |
lock?.lock() | |
guard case let .subscribed(subscription) = status else { | |
lock?.unlock() | |
return | |
} | |
status = .terminal | |
lock?.unlock() | |
subscription.cancel() | |
} | |
var description: String { return "TryScan" } | |
var customMirror: Mirror { | |
lock?.lock() | |
defer { lock?.unlock() } | |
let children: [Mirror.Child] = [ | |
("downstream", downstream), | |
("status", status), | |
("result", result) | |
] | |
return Mirror(self, children: children) | |
} | |
var playgroundDescription: Any { return description } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment