Last active
March 9, 2019 18:34
-
-
Save kaqu/e3c0a5110ca9b8dfaa6fd906f8bd645a to your computer and use it in GitHub Desktop.
PasteIn Future implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#if os(Linux) | |
import Glibc | |
#else | |
import Darwin | |
#endif | |
public final class Future<Value> { | |
public typealias Handler = (Value) -> Void | |
public typealias Executor = (@escaping () -> Void) -> Void | |
private let mtx = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1) | |
private var handlers: [Handler] = [] | |
private let executor: Executor? | |
private var value: Value? | |
fileprivate init(executor: Executor? = nil) { | |
self.executor = executor | |
initMutex(mtx) | |
} | |
public convenience init(with value: Value, executor: FutureExecutor? = nil) { | |
self.init(executor: executor?.execute) | |
self.value = value | |
} | |
public convenience init(executor: FutureExecutor, _ task: @escaping () -> Value) { | |
self.init(executor: executor.execute) | |
executor.execute { self.become(task()) } | |
} | |
public static func completable(executor: FutureExecutor? = nil) -> (future: Future<Value>, complete: (Value) -> Void) { | |
let future: Future<Value> = .init(executor: executor?.execute) | |
return (future: future, complete: future.become) | |
} | |
deinit { | |
deinitMutex(mtx) | |
} | |
} | |
public extension Future { | |
@discardableResult | |
func then(_ handler: @escaping Handler) -> Self { | |
pthread_mutex_lock(mtx) | |
defer { pthread_mutex_unlock(mtx)} | |
switch (value, executor) { | |
case let (.some(value), .some(executor)): | |
executor { handler(value) } | |
case let (.some(value), .none): | |
handler(value) | |
case (.none, _): | |
handlers.append(handler) | |
} | |
return self | |
} | |
func map<T>(_ transformation: @escaping (Value) -> (T)) -> Future<T> { | |
let mapped: Future<T> = .init() | |
then { value in | |
mapped.become(transformation(value)) | |
} | |
return mapped | |
} | |
func flatMap<T>(_ transformation: @escaping (Value) -> (Future<T>)) -> Future<T> { | |
let mapped: Future<T> = .init() | |
then { value in | |
transformation(value).then(mapped.become) | |
} | |
return mapped | |
} | |
func `switch`(to executor: @escaping Executor) -> Future<Value> { | |
let future: Future<Value> = .init(executor: executor) | |
then { future.become($0) } | |
return future | |
} | |
} | |
internal extension Future { | |
func become(_ value: Value) { | |
pthread_mutex_lock(mtx) | |
defer { pthread_mutex_unlock(mtx)} | |
guard case .none = self.value else { return } | |
self.value = value | |
switch executor { | |
case .none: | |
self.handlers.forEach { $0(value) } | |
self.handlers = [] | |
case let .some(executor): | |
let handlers = self.handlers | |
self.handlers = [] | |
executor { | |
handlers.forEach { $0(value) } | |
} | |
} | |
} | |
} | |
public func zip<T>(_ futures: [Future<T>]) -> Future<[T]> { | |
guard !futures.isEmpty else { return .init(with: []) } | |
let resultFuture: Future<[T]> = .init() | |
let count: Int = futures.count | |
var results: Array<T> = .init() | |
results.reserveCapacity(futures.count) | |
let mtx = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1) | |
initMutex(mtx) | |
for future in futures { | |
future.then { | |
pthread_mutex_lock(mtx) | |
results.append($0) | |
guard results.count == count else { pthread_mutex_unlock(mtx) ; return } | |
resultFuture.become(results) | |
deinitMutex(mtx) | |
} | |
} | |
return resultFuture | |
} | |
public func zip<T1, T2>(_ f1: Future<T1>, _ f2: Future<T2>) -> Future<(T1, T2)> { | |
let resultFuture: Future<(T1, T2)> = .init() | |
var results: (T1?, T2?) = (.none, .none) | |
let mtx = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1) | |
initMutex(mtx) | |
f1.then { | |
pthread_mutex_lock(mtx) | |
if case let (_, .some(v2)) = results { | |
resultFuture.become(($0, v2)) | |
deinitMutex(mtx) | |
} else { | |
results = ($0, .none) | |
pthread_mutex_unlock(mtx) | |
} | |
} | |
f2.then { | |
pthread_mutex_lock(mtx) | |
if case let (.some(v1), _) = results { | |
resultFuture.become((v1, $0)) | |
deinitMutex(mtx) | |
} else { | |
results = (.none, $0) | |
pthread_mutex_unlock(mtx) | |
} | |
} | |
return resultFuture | |
} | |
public func zip<T1, T2, T3>(_ f1: Future<T1>, _ f2: Future<T2>, _ f3: Future<T3>) -> Future<(T1, T2, T3)> { | |
let resultFuture: Future<(T1, T2, T3)> = .init() | |
var results: (T1?, T2?, T3?) = (.none, .none, .none) | |
let mtx = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1) | |
initMutex(mtx) | |
f1.then { | |
pthread_mutex_lock(mtx) | |
if case let (_, .some(v2), .some(v3)) = results { | |
resultFuture.become(($0, v2, v3)) | |
deinitMutex(mtx) | |
} else { | |
results = ($0, results.1, results.2) | |
pthread_mutex_unlock(mtx) | |
} | |
} | |
f2.then { | |
pthread_mutex_lock(mtx) | |
if case let (.some(v1), _, .some(v3)) = results { | |
resultFuture.become((v1, $0, v3)) | |
deinitMutex(mtx) | |
} else { | |
results = (results.0, $0, results.2) | |
pthread_mutex_unlock(mtx) | |
} | |
} | |
f3.then { | |
pthread_mutex_lock(mtx) | |
if case let (.some(v1), .some(v2), _) = results { | |
resultFuture.become((v1, v2, $0)) | |
deinitMutex(mtx) | |
} else { | |
results = (results.0, results.1, $0) | |
pthread_mutex_unlock(mtx) | |
} | |
} | |
return resultFuture | |
} | |
public protocol FutureExecutor { | |
func execute(task: @escaping () -> Void) -> Void | |
} | |
#if os(Linux) | |
import Dispatch | |
#else | |
import Foundation | |
extension OperationQueue: FutureExecutor { | |
public func execute(task: @escaping () -> Void) -> Void { | |
if OperationQueue.current == self { | |
task() | |
} else { | |
self.addOperation(task) | |
} | |
} | |
} | |
#endif | |
extension DispatchQueue { | |
public func execute(task: @escaping () -> Void) -> Void { | |
self.async(execute: task) | |
} | |
} | |
fileprivate func initMutex(_ ref: UnsafeMutablePointer<pthread_mutex_t>) { | |
let attr = UnsafeMutablePointer<pthread_mutexattr_t>.allocate(capacity: 1) | |
guard pthread_mutexattr_init(attr) == 0 else { preconditionFailure() } | |
#if os(Linux) | |
pthread_mutexattr_settype(attr, Int32(PTHREAD_MUTEX_RECURSIVE)) | |
pthread_mutexattr_setpshared(attr, Int32(PTHREAD_PROCESS_PRIVATE)) | |
#else | |
pthread_mutexattr_settype(attr, PTHREAD_MUTEX_RECURSIVE) | |
pthread_mutexattr_setpshared(attr, PTHREAD_PROCESS_PRIVATE) | |
#endif | |
guard pthread_mutex_init(ref, attr) == 0 else { preconditionFailure() } | |
pthread_mutexattr_destroy(attr) | |
attr.deinitialize(count: 1) | |
attr.deallocate() | |
} | |
fileprivate func deinitMutex(_ ref: UnsafeMutablePointer<pthread_mutex_t>) { | |
pthread_mutex_destroy(ref) | |
ref.deinitialize(count: 1) | |
ref.deallocate() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment