Created
March 24, 2021 09:37
-
-
Save KaQuMiQ/a76a2235b32cb6f06616d8907cd97109 to your computer and use it in GitHub Desktop.
Data tunnels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct Foundation.TimeInterval | |
import struct Foundation.Date | |
import class Atomics.ManagedAtomic | |
/// Cancelation token that can be used to cancel associated tasks. | |
public struct Cancelation { | |
private var status: () -> Bool | |
private var cancelation: () -> Void | |
} | |
public extension Cancelation { | |
/// Cancelation status. | |
var isCanceled: Bool { status() } | |
/// Cancel tasks and change status to canceled. | |
func cancel() { cancelation() } | |
} | |
public extension Cancelation { | |
/// Token which can become cancelled only when explicitly called. | |
static var manual: Self { | |
let state = ManagedAtomic<Bool>(false) | |
return Self( | |
status: { state.load(ordering: .sequentiallyConsistent) }, | |
cancelation: { state.store(true, ordering: .sequentiallyConsistent) } | |
) | |
} | |
/// Token which can become cancelled when explicitly called. | |
/// Takes closure executed on manual cancelation. | |
/// - warning: You have to ensure that closure execution is thread safe. | |
static func withClosure( | |
_ closure: @escaping () -> Void | |
) -> Self { | |
let state = ManagedAtomic<Bool>(false) | |
var closure: Optional<() -> Void> = closure | |
return Self( | |
status: { state.load(ordering: .sequentiallyConsistent) }, | |
cancelation: { | |
guard !state.exchange(true, ordering: .sequentiallyConsistent) else { return } | |
closure?() | |
closure = nil | |
}) | |
} | |
/// Token which can become cancelled when explicitly called | |
/// or automatically after timeout time. | |
static func withTimeout(_ timeInterval: TimeInterval) -> Self { | |
precondition(timeInterval > 0, "Cannot make timeout now or in past") | |
return withDeadline(Date(timeIntervalSinceNow: timeInterval)) | |
} | |
/// Token which can become cancelled when explicitly called | |
/// or automatically after passing deadline. | |
static func withDeadline( | |
_ date: Date, | |
now: @escaping () -> Date = Date.init | |
) -> Self { | |
assert(date.timeIntervalSince(now()) > 0, "Cannot make deadline now or in past") | |
let state = ManagedAtomic<Bool>(false) | |
return Self( | |
status: { state.load(ordering: .sequentiallyConsistent) || date.timeIntervalSince(now()) <= 0 }, | |
cancelation: { state.store(true, ordering: .sequentiallyConsistent) } | |
) | |
} | |
/// Token which combines given tokens. | |
/// On manual cancel cancels all tokens. | |
/// Is canceled only when all tokens are cancelled. | |
static func combined(_ tokens: Cancelation...) -> Self { | |
combined(tokens) | |
} | |
/// Token which combines given tokens. | |
/// On manual cancel cancels all tokens. | |
/// Is canceled only when all tokens are cancelled. | |
static func combined(_ tokens: Array<Cancelation>) -> Self { | |
assert(tokens.count > 1, "Cannot combine single or no token") | |
return Self( | |
status: { | |
for token in tokens { | |
if token.isCanceled { return true } | |
} | |
return false | |
}, | |
cancelation: { tokens.forEach { $0.cancel() } } | |
) | |
} | |
/// Token which will never become cancelled. | |
/// - warning: If you combine `never` cancelation token using `combined` function | |
/// it will prevent returned (combined) token from being ever canceled. | |
static var never: Self { | |
Self( | |
status: { false }, | |
cancelation: {} | |
) | |
} | |
static var canceled: Self { | |
Self( | |
status: { true }, | |
cancelation: {} | |
) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import class Dispatch.DispatchQueue | |
import struct Dispatch.DispatchTime | |
import enum Dispatch.DispatchTimeInterval | |
import struct Foundation.TimeInterval | |
public struct Scheduler { | |
public var scheduler: ( | |
_ delay: TimeInterval, // TODO: base implementation used to have deadline date/timestamp instead of delay | |
_ task: @escaping () -> Void | |
) -> Void | |
public init( | |
_ scheduler: @escaping ( | |
_ delay: TimeInterval, | |
_ task: @escaping () -> Void | |
) -> Void | |
) { | |
self.scheduler = scheduler | |
} | |
} | |
public extension Scheduler { | |
func schedule( | |
after delay: TimeInterval = 0, | |
_ task: @escaping () -> Void | |
) { | |
scheduler(delay, task) | |
} | |
} | |
public extension Scheduler { | |
static func dispatchQueue(_ queue: DispatchQueue) -> Self { | |
Self { delay, task in | |
if delay.isZero { | |
queue.async { task() } | |
} else { | |
let deadline = DispatchTime.now().advanced(by: .milliseconds(Int(delay * 1000))) | |
queue.asyncAfter(deadline: deadline) { task() } | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public enum TunnelClosure<Failure: Error> { | |
case closed | |
case terminated(Failure) | |
} | |
public struct Tunnel<Value, Failure: Error> { | |
internal var connect: ( | |
_ cancelation: Cancelation, | |
_ output: @escaping (Value) -> Void, | |
_ completion: @escaping (TunnelClosure<Failure>) -> Void | |
) -> Void | |
internal init( | |
connect: @escaping ( | |
_ cancelation: Cancelation, | |
_ output: @escaping (Value) -> Void, | |
_ completion: @escaping (TunnelClosure<Failure>) -> Void | |
) -> Void | |
) { | |
self.connect = connect | |
} | |
} | |
public extension Tunnel { | |
@discardableResult | |
func connect( | |
cancelation: Cancelation = .manual, | |
output: @escaping (Value) -> Void, | |
completion: @escaping (TunnelClosure<Failure>) -> Void | |
) -> Cancelation { | |
connect(cancelation, output, completion) | |
return cancelation | |
} | |
} | |
import class Foundation.NSRecursiveLock | |
import class Atomics.ManagedAtomic | |
public extension Tunnel { | |
static var closed: Self { | |
Self { _, _, completion in | |
completion(.closed) | |
} | |
} | |
static func terminated(reason error: Failure) -> Self { | |
Self { cancelation, _, completion in | |
if cancelation.isCanceled { | |
completion(.closed) // this should be verified | |
} else { | |
completion(.terminated(error)) | |
} | |
} | |
} | |
static func only(_ value: Value) -> Self { | |
Self { cancelation, output, completion in | |
guard !cancelation.isCanceled | |
else { return completion(.closed) } | |
output(value) | |
completion(.closed) | |
} | |
} | |
static func sequence<S>( | |
_ values: S | |
) -> Self | |
where S: Sequence, S.Element == Value { | |
return Self { cancelation, output, completion in | |
var iterator = values.makeIterator() | |
while let next = iterator.next() { | |
guard !cancelation.isCanceled | |
else { return completion(.closed) } | |
output(next) | |
} | |
completion(.closed) | |
} | |
} | |
} | |
public extension Tunnel { | |
static func future( | |
_ future: @escaping (@escaping (Result<Value, Failure>) -> Void) -> Void | |
) -> Self { | |
// we might need a buffer here | |
Self { cancelation, output, completion in | |
let completionStatus = ManagedAtomic<Bool>(false) | |
func completeOnce(_ completionResult: TunnelClosure<Failure>) { | |
guard !completionStatus.exchange(true, ordering: .sequentiallyConsistent) else { return } | |
completion(completionResult) | |
} | |
guard !cancelation.isCanceled | |
else { return completeOnce(.closed) } | |
future { result in | |
guard !cancelation.isCanceled | |
else { return completeOnce(.closed) } | |
switch result { | |
case let .success(value): | |
guard !completionStatus.exchange(true, ordering: .sequentiallyConsistent) else { return } | |
output(value) | |
completion(.closed) | |
case let .failure(error): | |
completeOnce(.terminated(error)) | |
} | |
} | |
} | |
} | |
} | |
public extension Tunnel { | |
func map<NewValue>( | |
_ transform: @escaping (Value) -> NewValue | |
) -> Tunnel<NewValue, Failure> { | |
Tunnel<NewValue, Failure> { cancelation, output, completion in | |
self.connect( | |
cancelation, | |
{ value in output(transform(value)) }, | |
completion | |
) | |
} | |
} | |
func flatMap<NewValue>( // TODO: validate correctness | |
_ transform: @escaping (Value) -> Tunnel<NewValue, Failure> | |
) -> Tunnel<NewValue, Failure> { | |
Tunnel<NewValue, Failure> { cancelation, output, completion in | |
let completionStatus = ManagedAtomic<Bool>(false) | |
func completeOnce(_ completionResult: TunnelClosure<Failure>) { | |
guard !completionStatus.exchange(true, ordering: .sequentiallyConsistent) else { return } | |
completion(completionResult) | |
} | |
let connectionCounter = ManagedAtomic<Int>(1) | |
func beginInnerConnection() { | |
connectionCounter.wrappingIncrement(ordering: .sequentiallyConsistent) | |
} | |
func closeIfNeeded() { | |
guard connectionCounter.wrappingDecrementThenLoad(ordering: .sequentiallyConsistent) <= 0 else { return } | |
completeOnce(.closed) | |
} | |
self.connect( | |
cancelation, | |
{ value in | |
guard !completionStatus.load(ordering: .sequentiallyConsistent) else { return } | |
beginInnerConnection() | |
transform(value) | |
.connect( | |
cancelation: cancelation, | |
output: output, | |
completion: { closure in | |
switch closure { | |
case .closed: | |
// we allow outer signal to be alive if there are no more values from inner signal | |
closeIfNeeded() | |
case let .terminated(error): | |
completeOnce(.terminated(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes or ignore it | |
} | |
} | |
) | |
}, | |
{ closure in | |
switch closure { | |
case .closed: | |
// we allow inner signal to be alive if there are no more values from outer signal | |
closeIfNeeded() | |
case let .terminated(error): | |
completeOnce(.terminated(error)) | |
} | |
} | |
) | |
} | |
} | |
func flatMapLatest<NewValue>( // TODO: validate correctness | |
_ transform: @escaping (Value) -> Tunnel<NewValue, Failure> | |
) -> Tunnel<NewValue, Failure> { | |
Tunnel<NewValue, Failure> { cancelation, output, completion in | |
let completionStatus = ManagedAtomic<Bool>(false) | |
func completeOnce(_ completionResult: TunnelClosure<Failure>) { | |
guard !completionStatus.exchange(true, ordering: .sequentiallyConsistent) else { return } | |
completion(completionResult) | |
} | |
let connectionCounter = ManagedAtomic<Int>(1) | |
func beginInnerConnection() { | |
connectionCounter.wrappingIncrement(ordering: .sequentiallyConsistent) | |
} | |
func closeIfNeeded() { | |
guard connectionCounter.wrappingDecrementThenLoad(ordering: .sequentiallyConsistent) <= 0 else { return } | |
completeOnce(.closed) | |
} | |
let latestCancelation = ManagedAtomic<Box<Cancelation>>(Box(.canceled)) // TODO: verify thread safety | |
self.connect( | |
cancelation, | |
{ value in | |
guard !completionStatus.load(ordering: .sequentiallyConsistent) else { return } | |
beginInnerConnection() | |
let localCancelation = Cancelation.manual | |
latestCancelation | |
.exchange( | |
Box(localCancelation), | |
ordering: .sequentiallyConsistent | |
) | |
.value | |
.cancel() | |
transform(value) | |
.connect( | |
cancelation: Cancelation | |
.combined(cancelation, localCancelation), | |
output: output, | |
completion: { closure in | |
switch closure { | |
case .closed: | |
// we allow outer signal to be alive if there are no more values from inner signal | |
closeIfNeeded() | |
case let .terminated(error): | |
completeOnce(.terminated(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes or ignore it | |
} | |
} | |
) | |
}, | |
{ closure in | |
switch closure { | |
case .closed: | |
// we allow inner signal to be alive if there are no more values from outer signal | |
closeIfNeeded() | |
case let .terminated(error): | |
completeOnce(.terminated(error)) | |
} | |
} | |
) | |
} | |
} | |
} | |
public extension Tunnel { | |
func filter( _ filter: @escaping (Value) -> Bool) -> Self { | |
Self { cancelation, output, completion in | |
self.connect( | |
cancelation, | |
{ value in | |
guard filter(value) else { return } | |
output(value) | |
}, | |
completion | |
) | |
} | |
} | |
} | |
public extension Tunnel { | |
func `switch`( | |
to scheduler: Scheduler | |
) -> Self { | |
Self { cancelation, output, completion in | |
self.connect( | |
cancelation, | |
{ value in | |
scheduler | |
.schedule { | |
// we wont deliver scheduled values after cancelation | |
guard !cancelation.isCanceled else { return } | |
output(value) | |
} | |
}, | |
{ completionResult in | |
scheduler.schedule { | |
// we wont deliver scheduled completion after cancelation | |
if cancelation.isCanceled { | |
completion(.closed) | |
} else { | |
completion(completionResult) | |
} | |
} | |
} | |
) | |
} | |
} | |
} | |
import protocol Atomics.AtomicReference | |
private final class Box<Value>: AtomicReference { | |
fileprivate let value: Value | |
fileprivate init(_ value: Value) { | |
self.value = value | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment