Skip to content

Instantly share code, notes, and snippets.

@KaQuMiQ
Last active March 10, 2021 15:54
Show Gist options
  • Select an option

  • Save KaQuMiQ/65fab5909b38c8b2e0c24be4ca70006a to your computer and use it in GitHub Desktop.

Select an option

Save KaQuMiQ/65fab5909b38c8b2e0c24be4ca70006a to your computer and use it in GitHub Desktop.
Futura3
import struct Foundation.TimeInterval
import struct Foundation.Date
import class Atomics.ManagedAtomic
/// Cancelation token that can be used to cancel associated tasks.
public struct Cancelation {
private var status: () -> Bool
private var cancelation: () -> Void
}
public extension Cancelation {
/// Cancelation status.
var isCanceled: Bool { status() }
/// Cancel tasks and change status to canceled.
func cancel() { cancelation() }
}
public extension Cancelation {
/// Token which can become cancelled only when explicitly called.
static var manual: Self {
let state = ManagedAtomic<Bool>(false)
return Self(
status: { state.load(ordering: .sequentiallyConsistent) },
cancelation: { state.store(true, ordering: .sequentiallyConsistent) }
)
}
/// Token which can become cancelled when explicitly called.
/// Takes closure executed on manual cancelation.
/// - warning: You have to ensure that closure execution is thread safe.
static func withClosure(
_ closure: @escaping () -> Void
) -> Self {
let state = ManagedAtomic<Bool>(false)
var closure: Optional<() -> Void> = closure
return Self(
status: { state.load(ordering: .sequentiallyConsistent) },
cancelation: {
guard !state.exchange(true, ordering: .sequentiallyConsistent) else { return }
closure?()
closure = nil
})
}
/// Token which can become cancelled when explicitly called
/// or automatically after timeout time.
static func withTimeout(_ timeInterval: TimeInterval) -> Self {
precondition(timeInterval > 0, "Cannot make timeout now or in past")
return withDeadline(Date(timeIntervalSinceNow: timeInterval))
}
/// Token which can become cancelled when explicitly called
/// or automatically after passing deadline.
static func withDeadline(
_ date: Date,
now: @escaping () -> Date = Date.init
) -> Self {
assert(date.timeIntervalSince(now()) > 0, "Cannot make deadline now or in past")
let state = ManagedAtomic<Bool>(false)
return Self(
status: { state.load(ordering: .sequentiallyConsistent) || date.timeIntervalSince(now()) <= 0 },
cancelation: { state.store(true, ordering: .sequentiallyConsistent) }
)
}
/// Token which combines given tokens.
/// On manual cancel cancels all tokens.
/// Is canceled only when all tokens are cancelled.
static func combined(_ tokens: Cancelation...) -> Self {
combined(tokens)
}
/// Token which combines given tokens.
/// On manual cancel cancels all tokens.
/// Is canceled only when all tokens are cancelled.
static func combined(_ tokens: Array<Cancelation>) -> Self {
assert(tokens.count > 1, "Cannot combine single or no token")
return Self(
status: {
for token in tokens {
if token.isCanceled { return true }
}
return false
},
cancelation: { tokens.forEach { $0.cancel() } }
)
}
/// Token which will never become cancelled.
/// - warning: If you combine `never` cancelation token using `combined` function
/// it will prevent returned (combined) token from being ever canceled.
static var never: Self {
Self(
status: { false },
cancelation: {}
)
}
static var canceled: Self {
Self(
status: { true },
cancelation: {}
)
}
}
import class Dispatch.DispatchQueue
import struct Dispatch.DispatchTime
import enum Dispatch.DispatchTimeInterval
import struct Foundation.TimeInterval
public struct Scheduler {
public var scheduler: (_ delay: TimeInterval, _ task: @escaping () -> Void) -> Void
public init(_ scheduler: @escaping (_ delay: TimeInterval, _ task: @escaping () -> Void) -> Void) {
self.scheduler = scheduler
}
}
public extension Scheduler {
func schedule(after delay: TimeInterval = 0, _ task: @escaping () -> Void) {
scheduler(delay, task)
}
}
public extension Scheduler {
static func dispatchQueue(_ queue: DispatchQueue) -> Self {
Self { delay, task in
if delay.isZero {
queue.async { task() }
} else {
let deadline = DispatchTime.now().advanced(by: .milliseconds(Int(delay * 1000)))
queue.asyncAfter(deadline: deadline) { task() }
}
}
}
}
public struct Signal<Value, Failure: Error> {
internal var source: (
_ demand: SignalDemand,
_ cancelation: Cancelation,
_ output: @escaping (Value) -> Void,
_ completion: @escaping (SignalDemandResult<Failure>) -> Void
) -> Void
internal init(
source: @escaping (
_ demand: SignalDemand,
_ cancelation: Cancelation,
_ output: @escaping (Value) -> Void,
_ completion: @escaping (SignalDemandResult<Failure>) -> Void
) -> Void
) {
self.source = source
}
}
public extension Signal {
@discardableResult
func request(
_ demand: SignalDemand = .unlimited,
cancelation: Cancelation = .manual,
output: @escaping (Value) -> Void,
completion: @escaping (SignalDemandResult<Failure>) -> Void
) -> Cancelation {
source(demand, cancelation, output, completion)
return cancelation
}
}
public extension Signal where Value == Void {
@discardableResult
func request(
_ demand: SignalDemand = .unlimited,
cancelation: Cancelation = .manual,
output: @escaping () -> Void,
completion: @escaping (SignalDemandResult<Failure>) -> Void
) -> Cancelation {
self.request(
demand,
cancelation: cancelation,
output: { _ in output() },
completion: completion
)
}
}
public extension Signal where Value == Never {
@discardableResult
func request(
_ demand: SignalDemand = .unlimited,
cancelation: Cancelation = .manual,
completion: @escaping (SignalDemandResult<Failure>) -> Void
) -> Cancelation {
self.request(
demand,
cancelation: cancelation,
output: { _ in },
completion: completion
)
}
}
import class Foundation.NSRecursiveLock
import class Atomics.ManagedAtomic
public extension Signal {
static var finished: Self {
Self { _, cancelation, _, completion in
if cancelation.isCanceled {
completion(.canceled)
} else {
completion(.finished)
}
}
}
static func failed(error: Failure) -> Self {
Self { _, cancelation, _, completion in
if cancelation.isCanceled {
completion(.canceled)
} else {
completion(.failed(error))
}
}
}
static func just(_ value: Value) -> Self {
Self { demand, cancelation, output, completion in
guard !cancelation.isCanceled else { return completion(.canceled) }
switch demand {
case .upTo(0):
completion(.finished)
case .unlimited, .upTo:
output(value)
completion(.finished)
}
}
}
static func sequence<S>(
_ values: S
) -> Self
where S: Sequence, S.Element == Value {
return Self { demand, cancelation, output, completion in
switch demand {
case .unlimited:
var iterator = values.makeIterator()
while let next = iterator.next() {
guard !cancelation.isCanceled else { return completion(.canceled) }
output(next)
}
case var .upTo(remaining):
var iterator = values.makeIterator()
while let next = iterator.next(), remaining > 0 {
guard !cancelation.isCanceled else { return completion(.canceled) }
output(next)
remaining -= 1
}
}
completion(.finished)
}
}
static func lazy(
_ load: @escaping () -> Value
) -> Self {
let cacheLock = NSRecursiveLock() // we might also try out RWLock or plain mutex here
var cachedValue = Optional<Value>.none
return Self { demand, cancelation, output, completion in
guard !cancelation.isCanceled else { return completion(.canceled) }
switch demand {
case .upTo(0):
completion(.finished)
case .unlimited, .upTo:
cacheLock.lock()
let value: Value
if let cachedValue = cachedValue {
value = cachedValue
} else {
value = load()
cachedValue = value
}
cacheLock.unlock()
guard !cancelation.isCanceled else { return completion(.canceled) }
output(value)
completion(.finished)
}
}
}
static func deferred(
_ deferred: @escaping (@escaping (Value) -> Void) -> Void
) -> Self {
Self { demand, cancelation, output, completion in
let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here
func completeOnce(_ completionResult: SignalDemandResult<Failure>) {
if completed.exchange(true, ordering: .sequentiallyConsistent) {
return
} else {
return completion(completionResult)
}
}
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
switch demand {
case .upTo(0):
completeOnce(.finished)
case .unlimited, .upTo:
deferred { value in
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
guard !completed.exchange(true, ordering: .sequentiallyConsistent) else { return }
output(value)
completion(.finished)
}
}
}
}
static func future(
_ future: @escaping (@escaping (Result<Value, Failure>) -> Void) -> Void
) -> Self {
Self { demand, cancelation, output, completion in
let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here
func completeOnce(_ completionResult: SignalDemandResult<Failure>) {
if completed.exchange(true, ordering: .sequentiallyConsistent) {
return
} else {
return completion(completionResult)
}
}
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
switch demand {
case .upTo(0):
completeOnce(.finished)
case .unlimited, .upTo:
future { result in
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
switch result {
case let .success(value):
guard !completed.exchange(true, ordering: .sequentiallyConsistent) else { return }
output(value)
completion(.finished)
case let .failure(error):
completeOnce(.failed(error))
}
}
}
}
}
static func stream(
_ stream: @escaping (
_ cancelation: Cancelation,
_ output: @escaping (Value) -> Void,
_ close: @escaping (Optional<Failure>) -> Void
) -> Void
) -> Self {
Self { demand, cancelation, output, completion in
let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here
func completeOnce(_ completionResult: SignalDemandResult<Failure>) {
if completed.exchange(true, ordering: .sequentiallyConsistent) {
return
} else {
return completion(completionResult)
}
}
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
switch demand {
case let .upTo(count):
let reminingCount = ManagedAtomic<UInt>(count)
stream(
cancelation,
{ value in
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
guard reminingCount.loadThenWrappingDecrement(ordering: .sequentiallyConsistent) > 0
else { return completeOnce(.finished) }
output(value)
},
{ closing in
if let error = closing {
completeOnce(.failed(error))
} else {
completeOnce(.finished)
}
}
)
case .unlimited:
stream(
cancelation,
{ value in
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
output(value)
},
{ closing in
if let error = closing {
completeOnce(.failed(error))
} else {
completeOnce(.finished)
}
}
)
}
}
}
}
public extension Signal {
func map<NewValue>(
_ transform: @escaping (Value) -> NewValue
) -> Signal<NewValue, Failure> {
Signal<NewValue, Failure> { demand, cancelation, output, completion in
self.source(
demand,
cancelation,
{ value in output(transform(value)) },
completion
)
}
}
func flatMap<NewValue>(
_ transform: @escaping (Value) -> Signal<NewValue, Failure>
) -> Signal<NewValue, Failure> {
Signal<NewValue, Failure> { demand, cancelation, output, completion in
let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here
func completeOnce(_ completionResult: SignalDemandResult<Failure>) {
if completed.exchange(true, ordering: .sequentiallyConsistent) {
return
} else {
return completion(completionResult)
}
}
let finishCounter = ManagedAtomic<Int>(1)
func beginNested() {
finishCounter.wrappingIncrement(ordering: .sequentiallyConsistent)
}
func finishNested() {
finishCounter.wrappingDecrement(ordering: .sequentiallyConsistent)
}
func finishIfNeeded() {
guard finishCounter.wrappingDecrementThenLoad(ordering: .sequentiallyConsistent) <= 0 else { return }
completeOnce(.finished)
}
switch demand {
case let .upTo(count):
let reminingCount = ManagedAtomic<UInt>(count)
self.source(
.unlimited,
cancelation,
{ value in
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
guard !completed.load(ordering: .sequentiallyConsistent) else { return }
beginNested()
transform(value)
.source(
demand,
cancelation,
{ value in
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
guard reminingCount.loadThenWrappingDecrement(ordering: .sequentiallyConsistent) > 0
else { return completeOnce(.finished) }
guard !completed.load(ordering: .sequentiallyConsistent) else { return }
output(value)
},
{ result in
switch result {
case .finished:
finishIfNeeded() // we allow outer signal to be alive if there are no more values from inner signal
case .canceled:
finishNested() // we allow outer signal to be alive if inner signal was canceled
case let .failed(error):
completeOnce(.failed(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes
}
}
)
},
{ result in
switch result {
case .finished:
finishIfNeeded() // we allow inner signal to be alive if there are no more values from outer signal
case .canceled:
completeOnce(.canceled)
case let .failed(error):
completeOnce(.failed(error))
}
}
)
case .unlimited:
self.source(
.unlimited,
cancelation,
{ value in
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
guard !completed.load(ordering: .sequentiallyConsistent) else { return }
beginNested()
transform(value)
.source(
.unlimited,
cancelation,
{ value in
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
guard !completed.load(ordering: .sequentiallyConsistent) else { return }
output(value)
},
{ result in
switch result {
case .finished:
finishIfNeeded() // we allow outer signal to be alive if there are no more values from inner signal
case .canceled:
finishNested() // we allow outer signal to be alive if inner signal was canceled
case let .failed(error):
completeOnce(.failed(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes
}
}
)
},
{ result in
switch result {
case .finished:
finishIfNeeded() // we allow inner signal to be alive if there are no more values from outer signal
case .canceled:
completeOnce(.canceled)
case let .failed(error):
completeOnce(.failed(error))
}
}
)
}
}
}
func flatMapLatest<NewValue>(
_ transform: @escaping (Value) -> Signal<NewValue, Failure>
) -> Signal<NewValue, Failure> {
Signal<NewValue, Failure> { demand, cancelation, output, completion in
let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here
func completeOnce(_ completionResult: SignalDemandResult<Failure>) {
if completed.exchange(true, ordering: .sequentiallyConsistent) {
return
} else {
return completion(completionResult)
}
}
let finishCounter = ManagedAtomic<Int>(1)
func beginNested() {
finishCounter.wrappingIncrement(ordering: .sequentiallyConsistent)
}
func finishNested() {
finishCounter.wrappingDecrement(ordering: .sequentiallyConsistent)
}
func finishIfNeeded() {
guard finishCounter.wrappingDecrementThenLoad(ordering: .sequentiallyConsistent) <= 0 else { return }
completeOnce(.finished)
}
let latestCancelation = ManagedAtomic<Box<Cancelation>>(Box(.canceled))
switch demand {
case let .upTo(count):
let reminingCount = ManagedAtomic<UInt>(count)
self.source(
.unlimited,
cancelation,
{ value in
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
guard !completed.load(ordering: .sequentiallyConsistent) else { return }
let localCancelation = Cancelation.manual
latestCancelation
.exchange(
Box(localCancelation),
ordering: .sequentiallyConsistent
)
.value
.cancel()
let combinedCancelation = Cancelation.combined(cancelation, localCancelation)
beginNested()
transform(value)
.source(
demand,
combinedCancelation,
{ value in
guard !combinedCancelation.isCanceled else { return completeOnce(.canceled) }
guard reminingCount.loadThenWrappingDecrement(ordering: .sequentiallyConsistent) > 0
else { return completeOnce(.finished) }
guard !completed.load(ordering: .sequentiallyConsistent) else { return }
output(value)
},
{ result in
switch result {
case .finished:
finishIfNeeded() // we allow outer signal to be alive if there are no more values from inner signal
case .canceled:
finishNested() // we allow outer signal to be alive if inner signal was canceled
case let .failed(error):
completeOnce(.failed(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes
}
}
)
},
{ result in
switch result {
case .finished:
finishIfNeeded() // we allow inner signal to be alive if there are no more values from outer signal
case .canceled:
completeOnce(.canceled)
case let .failed(error):
completeOnce(.failed(error))
}
}
)
case .unlimited:
self.source(
.unlimited,
cancelation,
{ value in
guard !cancelation.isCanceled else { return completeOnce(.canceled) }
guard !completed.load(ordering: .sequentiallyConsistent) else { return }
let localCancelation = Cancelation.manual
latestCancelation
.exchange(
Box(localCancelation),
ordering: .sequentiallyConsistent
)
.value
.cancel()
let combinedCancelation = Cancelation.combined(cancelation, localCancelation)
beginNested()
transform(value)
.source(
.unlimited,
combinedCancelation,
{ value in
guard !combinedCancelation.isCanceled else { return completeOnce(.canceled) }
guard !completed.load(ordering: .sequentiallyConsistent) else { return }
output(value)
},
{ result in
switch result {
case .finished:
finishIfNeeded() // we allow outer signal to be alive if there are no more values from inner signal
case .canceled:
finishNested() // we allow outer signal to be alive if inner signal was canceled
case let .failed(error):
completeOnce(.failed(error)) // TODO: verify if we shouldn't postpone passing failure from inner until outer completes
}
}
)
},
{ result in
switch result {
case .finished:
finishIfNeeded() // we allow inner signal to be alive if there are no more values from outer signal
case .canceled:
completeOnce(.canceled)
case let .failed(error):
completeOnce(.failed(error))
}
}
)
}
}
}
func `switch`(
to scheduler: Scheduler
) -> Self {
Self { demand, cancelation, output, completion in
self.source(
demand,
cancelation,
{ value in
scheduler
.schedule {
// we wont deliver scheduled values after cancelation
guard !cancelation.isCanceled else { return }
output(value)
}
},
{ completionResult in
scheduler.schedule {
// we wont deliver scheduled completion after cancelation
if cancelation.isCanceled {
completion(.canceled)
} else {
completion(completionResult)
}
}
}
)
}
}
}
public extension Signal {
func filter( _ filter: @escaping (Value) -> Bool) -> Self {
Self { demand, cancelation, output, completion in
self.source(
demand,
cancelation,
{ value in
guard filter(value) else { return }
output(value)
},
completion
)
}
}
}
public extension Signal {
func shared(bufferSize: UInt) -> Self {
fatalError("TODO: to complete - key component")
// #warning("TODO: locking")
// var buffer = Array<Value>()
// buffer.reserveCapacity(Int(bufferSize))
// var completionResult: SignalDemandResult<Failure>?
// self.source(
// .unlimited,
// .never,
// { value in
// if buffer.count >= bufferSize {
// buffer.removeFirst()
// } else { /* */ }
// buffer.append(value)
// },
// { result in
// completionResult = result
// }
// )
// return Self { demand, cancelation, output, completion in
// guard !cancelation.isCanceled else { return completion(.canceled) }
// switch demand {
// case let .upTo(count):
// case .unlimited:
// }
// }
}
}
public func merge<Value, Failure: Error>(
_ signals: Signal<Value, Failure>...
) -> Signal<Value, Failure> {
return Signal<Value, Failure> { demand, cancelation, output, completion in
guard !cancelation.isCanceled else { return completion(.canceled) }
let localCancelation = Cancelation.manual
let combinedCancelation = Cancelation.combined(cancelation, localCancelation)
let completed = ManagedAtomic<Bool>(false) // we might also try out atomic_flag here
func completeOnce(_ completionResult: SignalDemandResult<Failure>) {
defer { localCancelation.cancel() }
if completed.exchange(true, ordering: .sequentiallyConsistent) {
return
} else {
return completion(completionResult)
}
}
switch demand {
case let .upTo(count):
let reminingCount = ManagedAtomic<UInt>(count)
signals.forEach { signal in
signal
.source(
demand,
combinedCancelation,
{ value in
guard !combinedCancelation.isCanceled else { return completeOnce(.canceled) }
guard reminingCount.loadThenWrappingDecrement(ordering: .sequentiallyConsistent) > 0
else { return completeOnce(.finished) }
guard !completed.load(ordering: .sequentiallyConsistent) else { return }
output(value)
},
{ completionResult in
completeOnce(completionResult)
}
)
}
case .unlimited:
signals.forEach { signal in
signal
.source(
.unlimited,
combinedCancelation,
{ value in
guard !combinedCancelation.isCanceled else { return completeOnce(.canceled) }
guard !completed.load(ordering: .sequentiallyConsistent) else { return }
output(value)
},
{ completionResult in
completeOnce(completionResult)
}
)
}
}
}
}
import protocol Atomics.AtomicReference
private final class Box<Value>: AtomicReference {
fileprivate let value: Value
fileprivate init(_ value: Value) {
self.value = value
}
}
public enum SignalDemand: Hashable, Comparable {
case upTo(UInt)
case unlimited
public static func < (lhs: SignalDemand, rhs: SignalDemand) -> Bool {
switch (lhs, rhs) {
case (.unlimited, .unlimited):
return false
case (.unlimited, _):
return false
case (_, .unlimited):
return true
case let (.upTo(lVal), .upTo(rVal)):
return lVal < rVal
}
}
}
public enum SignalDemandResult<Failure: Error> {
case finished
case failed(Failure)
case canceled
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment