Skip to content

Instantly share code, notes, and snippets.

@KaQuMiQ
Created March 24, 2021 09:37
Show Gist options
  • Save KaQuMiQ/a76a2235b32cb6f06616d8907cd97109 to your computer and use it in GitHub Desktop.
Save KaQuMiQ/a76a2235b32cb6f06616d8907cd97109 to your computer and use it in GitHub Desktop.
Data tunnels
import struct Foundation.TimeInterval
import struct Foundation.Date
import class Atomics.ManagedAtomic
/// Cancelation token that can be used to cancel associated tasks.
public struct Cancelation {
private var status: () -> Bool
private var cancelation: () -> Void
}
public extension Cancelation {
/// Cancelation status.
var isCanceled: Bool { status() }
/// Cancel tasks and change status to canceled.
func cancel() { cancelation() }
}
public extension Cancelation {
/// Token which can become cancelled only when explicitly called.
static var manual: Self {
let state = ManagedAtomic<Bool>(false)
return Self(
status: { state.load(ordering: .sequentiallyConsistent) },
cancelation: { state.store(true, ordering: .sequentiallyConsistent) }
)
}
/// Token which can become cancelled when explicitly called.
/// Takes closure executed on manual cancelation.
/// - warning: You have to ensure that closure execution is thread safe.
static func withClosure(
_ closure: @escaping () -> Void
) -> Self {
let state = ManagedAtomic<Bool>(false)
var closure: Optional<() -> Void> = closure
return Self(
status: { state.load(ordering: .sequentiallyConsistent) },
cancelation: {
guard !state.exchange(true, ordering: .sequentiallyConsistent) else { return }
closure?()
closure = nil
})
}
/// Token which can become cancelled when explicitly called
/// or automatically after timeout time.
static func withTimeout(_ timeInterval: TimeInterval) -> Self {
precondition(timeInterval > 0, "Cannot make timeout now or in past")
return withDeadline(Date(timeIntervalSinceNow: timeInterval))
}
/// Token which can become cancelled when explicitly called
/// or automatically after passing deadline.
static func withDeadline(
_ date: Date,
now: @escaping () -> Date = Date.init
) -> Self {
assert(date.timeIntervalSince(now()) > 0, "Cannot make deadline now or in past")
let state = ManagedAtomic<Bool>(false)
return Self(
status: { state.load(ordering: .sequentiallyConsistent) || date.timeIntervalSince(now()) <= 0 },
cancelation: { state.store(true, ordering: .sequentiallyConsistent) }
)
}
/// Token which combines given tokens.
/// On manual cancel cancels all tokens.
/// Is canceled only when all tokens are cancelled.
static func combined(_ tokens: Cancelation...) -> Self {
combined(tokens)
}
/// Token which combines given tokens.
/// On manual cancel cancels all tokens.
/// Is canceled only when all tokens are cancelled.
static func combined(_ tokens: Array<Cancelation>) -> Self {
assert(tokens.count > 1, "Cannot combine single or no token")
return Self(
status: {
for token in tokens {
if token.isCanceled { return true }
}
return false
},
cancelation: { tokens.forEach { $0.cancel() } }
)
}
/// Token which will never become cancelled.
/// - warning: If you combine `never` cancelation token using `combined` function
/// it will prevent returned (combined) token from being ever canceled.
static var never: Self {
Self(
status: { false },
cancelation: {}
)
}
static var canceled: Self {
Self(
status: { true },
cancelation: {}
)
}
}
import class Dispatch.DispatchQueue
import struct Dispatch.DispatchTime
import enum Dispatch.DispatchTimeInterval
import struct Foundation.TimeInterval
public struct Scheduler {
public var scheduler: (
_ delay: TimeInterval, // TODO: base implementation used to have deadline date/timestamp instead of delay
_ task: @escaping () -> Void
) -> Void
public init(
_ scheduler: @escaping (
_ delay: TimeInterval,
_ task: @escaping () -> Void
) -> Void
) {
self.scheduler = scheduler
}
}
public extension Scheduler {
func schedule(
after delay: TimeInterval = 0,
_ task: @escaping () -> Void
) {
scheduler(delay, task)
}
}
public extension Scheduler {
static func dispatchQueue(_ queue: DispatchQueue) -> Self {
Self { delay, task in
if delay.isZero {
queue.async { task() }
} else {
let deadline = DispatchTime.now().advanced(by: .milliseconds(Int(delay * 1000)))
queue.asyncAfter(deadline: deadline) { task() }
}
}
}
}
public enum TunnelClosure<Failure: Error> {
case closed
case terminated(Failure)
}
public struct Tunnel<Value, Failure: Error> {
internal var connect: (
_ cancelation: Cancelation,
_ output: @escaping (Value) -> Void,
_ completion: @escaping (TunnelClosure<Failure>) -> Void
) -> Void
internal init(
connect: @escaping (
_ cancelation: Cancelation,
_ output: @escaping (Value) -> Void,
_ completion: @escaping (TunnelClosure<Failure>) -> Void
) -> Void
) {
self.connect = connect
}
}
public extension Tunnel {
@discardableResult
func connect(
cancelation: Cancelation = .manual,
output: @escaping (Value) -> Void,
completion: @escaping (TunnelClosure<Failure>) -> Void
) -> Cancelation {
connect(cancelation, output, completion)
return cancelation
}
}
import class Foundation.NSRecursiveLock
import class Atomics.ManagedAtomic
public extension Tunnel {
static var closed: Self {
Self { _, _, completion in
completion(.closed)
}
}
static func terminated(reason error: Failure) -> Self {
Self { cancelation, _, completion in
if cancelation.isCanceled {
completion(.closed) // this should be verified
} else {
completion(.terminated(error))
}
}
}
static func only(_ value: Value) -> Self {
Self { cancelation, output, completion in
guard !cancelation.isCanceled
else { return completion(.closed) }
output(value)
completion(.closed)
}
}
static func sequence<S>(
_ values: S
) -> Self
where S: Sequence, S.Element == Value {
return Self { cancelation, output, completion in
var iterator = values.makeIterator()
while let next = iterator.next() {
guard !cancelation.isCanceled
else { return completion(.closed) }
output(next)
}
completion(.closed)
}
}
}
public extension Tunnel {
static func future(
_ future: @escaping (@escaping (Result<Value, Failure>) -> Void) -> Void
) -> Self {
// we might need a buffer here
Self { cancelation, output, completion in
let completionStatus = ManagedAtomic<Bool>(false)
func completeOnce(_ completionResult: TunnelClosure<Failure>) {
guard !completionStatus.exchange(true, ordering: .sequentiallyConsistent) else { return }
completion(completionResult)
}
guard !cancelation.isCanceled
else { return completeOnce(.closed) }
future { result in
guard !cancelation.isCanceled
else { return completeOnce(.closed) }
switch result {
case let .success(value):
guard !completionStatus.exchange(true, ordering: .sequentiallyConsistent) else { return }
output(value)
completion(.closed)
case let .failure(error):
completeOnce(.terminated(error))
}
}
}
}
}
public extension Tunnel {
func map<NewValue>(
_ transform: @escaping (Value) -> NewValue
) -> Tunnel<NewValue, Failure> {
Tunnel<NewValue, Failure> { cancelation, output, completion in
self.connect(
cancelation,
{ value in output(transform(value)) },
completion
)
}
}
func flatMap<NewValue>( // TODO: validate correctness
_ transform: @escaping (Value) -> Tunnel<NewValue, Failure>
) -> Tunnel<NewValue, Failure> {
Tunnel<NewValue, Failure> { cancelation, output, completion in
let completionStatus = ManagedAtomic<Bool>(false)
func completeOnce(_ completionResult: TunnelClosure<Failure>) {
guard !completionStatus.exchange(true, ordering: .sequentiallyConsistent) else { return }
completion(completionResult)
}
let connectionCounter = ManagedAtomic<Int>(1)
func beginInnerConnection() {
connectionCounter.wrappingIncrement(ordering: .sequentiallyConsistent)
}
func closeIfNeeded() {
guard connectionCounter.wrappingDecrementThenLoad(ordering: .sequentiallyConsistent) <= 0 else { return }
completeOnce(.closed)
}
self.connect(
cancelation,
{ value in
guard !completionStatus.load(ordering: .sequentiallyConsistent) else { return }
beginInnerConnection()
transform(value)
.connect(
cancelation: cancelation,
output: output,
completion: { closure in
switch closure {
case .closed:
// we allow outer signal to be alive if there are no more values from inner signal
closeIfNeeded()
case let .terminated(error):
completeOnce(.terminated(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes or ignore it
}
}
)
},
{ closure in
switch closure {
case .closed:
// we allow inner signal to be alive if there are no more values from outer signal
closeIfNeeded()
case let .terminated(error):
completeOnce(.terminated(error))
}
}
)
}
}
func flatMapLatest<NewValue>( // TODO: validate correctness
_ transform: @escaping (Value) -> Tunnel<NewValue, Failure>
) -> Tunnel<NewValue, Failure> {
Tunnel<NewValue, Failure> { cancelation, output, completion in
let completionStatus = ManagedAtomic<Bool>(false)
func completeOnce(_ completionResult: TunnelClosure<Failure>) {
guard !completionStatus.exchange(true, ordering: .sequentiallyConsistent) else { return }
completion(completionResult)
}
let connectionCounter = ManagedAtomic<Int>(1)
func beginInnerConnection() {
connectionCounter.wrappingIncrement(ordering: .sequentiallyConsistent)
}
func closeIfNeeded() {
guard connectionCounter.wrappingDecrementThenLoad(ordering: .sequentiallyConsistent) <= 0 else { return }
completeOnce(.closed)
}
let latestCancelation = ManagedAtomic<Box<Cancelation>>(Box(.canceled)) // TODO: verify thread safety
self.connect(
cancelation,
{ value in
guard !completionStatus.load(ordering: .sequentiallyConsistent) else { return }
beginInnerConnection()
let localCancelation = Cancelation.manual
latestCancelation
.exchange(
Box(localCancelation),
ordering: .sequentiallyConsistent
)
.value
.cancel()
transform(value)
.connect(
cancelation: Cancelation
.combined(cancelation, localCancelation),
output: output,
completion: { closure in
switch closure {
case .closed:
// we allow outer signal to be alive if there are no more values from inner signal
closeIfNeeded()
case let .terminated(error):
completeOnce(.terminated(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes or ignore it
}
}
)
},
{ closure in
switch closure {
case .closed:
// we allow inner signal to be alive if there are no more values from outer signal
closeIfNeeded()
case let .terminated(error):
completeOnce(.terminated(error))
}
}
)
}
}
}
public extension Tunnel {
func filter( _ filter: @escaping (Value) -> Bool) -> Self {
Self { cancelation, output, completion in
self.connect(
cancelation,
{ value in
guard filter(value) else { return }
output(value)
},
completion
)
}
}
}
public extension Tunnel {
func `switch`(
to scheduler: Scheduler
) -> Self {
Self { cancelation, output, completion in
self.connect(
cancelation,
{ value in
scheduler
.schedule {
// we wont deliver scheduled values after cancelation
guard !cancelation.isCanceled else { return }
output(value)
}
},
{ completionResult in
scheduler.schedule {
// we wont deliver scheduled completion after cancelation
if cancelation.isCanceled {
completion(.closed)
} else {
completion(completionResult)
}
}
}
)
}
}
}
import protocol Atomics.AtomicReference
private final class Box<Value>: AtomicReference {
fileprivate let value: Value
fileprivate init(_ value: Value) {
self.value = value
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment