Skip to content

Instantly share code, notes, and snippets.

@kaqu
Last active March 9, 2019 18:34
Show Gist options
  • Save kaqu/e3c0a5110ca9b8dfaa6fd906f8bd645a to your computer and use it in GitHub Desktop.
Save kaqu/e3c0a5110ca9b8dfaa6fd906f8bd645a to your computer and use it in GitHub Desktop.
PasteIn Future implementation
#if os(Linux)
import Glibc
#else
import Darwin
#endif
public final class Future<Value> {
public typealias Handler = (Value) -> Void
public typealias Executor = (@escaping () -> Void) -> Void
private let mtx = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
private var handlers: [Handler] = []
private let executor: Executor?
private var value: Value?
fileprivate init(executor: Executor? = nil) {
self.executor = executor
initMutex(mtx)
}
public convenience init(with value: Value, executor: FutureExecutor? = nil) {
self.init(executor: executor?.execute)
self.value = value
}
public convenience init(executor: FutureExecutor, _ task: @escaping () -> Value) {
self.init(executor: executor.execute)
executor.execute { self.become(task()) }
}
public static func completable(executor: FutureExecutor? = nil) -> (future: Future<Value>, complete: (Value) -> Void) {
let future: Future<Value> = .init(executor: executor?.execute)
return (future: future, complete: future.become)
}
deinit {
deinitMutex(mtx)
}
}
public extension Future {
@discardableResult
func then(_ handler: @escaping Handler) -> Self {
pthread_mutex_lock(mtx)
defer { pthread_mutex_unlock(mtx)}
switch (value, executor) {
case let (.some(value), .some(executor)):
executor { handler(value) }
case let (.some(value), .none):
handler(value)
case (.none, _):
handlers.append(handler)
}
return self
}
func map<T>(_ transformation: @escaping (Value) -> (T)) -> Future<T> {
let mapped: Future<T> = .init()
then { value in
mapped.become(transformation(value))
}
return mapped
}
func flatMap<T>(_ transformation: @escaping (Value) -> (Future<T>)) -> Future<T> {
let mapped: Future<T> = .init()
then { value in
transformation(value).then(mapped.become)
}
return mapped
}
func `switch`(to executor: @escaping Executor) -> Future<Value> {
let future: Future<Value> = .init(executor: executor)
then { future.become($0) }
return future
}
}
internal extension Future {
func become(_ value: Value) {
pthread_mutex_lock(mtx)
defer { pthread_mutex_unlock(mtx)}
guard case .none = self.value else { return }
self.value = value
switch executor {
case .none:
self.handlers.forEach { $0(value) }
self.handlers = []
case let .some(executor):
let handlers = self.handlers
self.handlers = []
executor {
handlers.forEach { $0(value) }
}
}
}
}
public func zip<T>(_ futures: [Future<T>]) -> Future<[T]> {
guard !futures.isEmpty else { return .init(with: []) }
let resultFuture: Future<[T]> = .init()
let count: Int = futures.count
var results: Array<T> = .init()
results.reserveCapacity(futures.count)
let mtx = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
initMutex(mtx)
for future in futures {
future.then {
pthread_mutex_lock(mtx)
results.append($0)
guard results.count == count else { pthread_mutex_unlock(mtx) ; return }
resultFuture.become(results)
deinitMutex(mtx)
}
}
return resultFuture
}
public func zip<T1, T2>(_ f1: Future<T1>, _ f2: Future<T2>) -> Future<(T1, T2)> {
let resultFuture: Future<(T1, T2)> = .init()
var results: (T1?, T2?) = (.none, .none)
let mtx = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
initMutex(mtx)
f1.then {
pthread_mutex_lock(mtx)
if case let (_, .some(v2)) = results {
resultFuture.become(($0, v2))
deinitMutex(mtx)
} else {
results = ($0, .none)
pthread_mutex_unlock(mtx)
}
}
f2.then {
pthread_mutex_lock(mtx)
if case let (.some(v1), _) = results {
resultFuture.become((v1, $0))
deinitMutex(mtx)
} else {
results = (.none, $0)
pthread_mutex_unlock(mtx)
}
}
return resultFuture
}
public func zip<T1, T2, T3>(_ f1: Future<T1>, _ f2: Future<T2>, _ f3: Future<T3>) -> Future<(T1, T2, T3)> {
let resultFuture: Future<(T1, T2, T3)> = .init()
var results: (T1?, T2?, T3?) = (.none, .none, .none)
let mtx = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
initMutex(mtx)
f1.then {
pthread_mutex_lock(mtx)
if case let (_, .some(v2), .some(v3)) = results {
resultFuture.become(($0, v2, v3))
deinitMutex(mtx)
} else {
results = ($0, results.1, results.2)
pthread_mutex_unlock(mtx)
}
}
f2.then {
pthread_mutex_lock(mtx)
if case let (.some(v1), _, .some(v3)) = results {
resultFuture.become((v1, $0, v3))
deinitMutex(mtx)
} else {
results = (results.0, $0, results.2)
pthread_mutex_unlock(mtx)
}
}
f3.then {
pthread_mutex_lock(mtx)
if case let (.some(v1), .some(v2), _) = results {
resultFuture.become((v1, v2, $0))
deinitMutex(mtx)
} else {
results = (results.0, results.1, $0)
pthread_mutex_unlock(mtx)
}
}
return resultFuture
}
public protocol FutureExecutor {
func execute(task: @escaping () -> Void) -> Void
}
#if os(Linux)
import Dispatch
#else
import Foundation
extension OperationQueue: FutureExecutor {
public func execute(task: @escaping () -> Void) -> Void {
if OperationQueue.current == self {
task()
} else {
self.addOperation(task)
}
}
}
#endif
extension DispatchQueue {
public func execute(task: @escaping () -> Void) -> Void {
self.async(execute: task)
}
}
fileprivate func initMutex(_ ref: UnsafeMutablePointer<pthread_mutex_t>) {
let attr = UnsafeMutablePointer<pthread_mutexattr_t>.allocate(capacity: 1)
guard pthread_mutexattr_init(attr) == 0 else { preconditionFailure() }
#if os(Linux)
pthread_mutexattr_settype(attr, Int32(PTHREAD_MUTEX_RECURSIVE))
pthread_mutexattr_setpshared(attr, Int32(PTHREAD_PROCESS_PRIVATE))
#else
pthread_mutexattr_settype(attr, PTHREAD_MUTEX_RECURSIVE)
pthread_mutexattr_setpshared(attr, PTHREAD_PROCESS_PRIVATE)
#endif
guard pthread_mutex_init(ref, attr) == 0 else { preconditionFailure() }
pthread_mutexattr_destroy(attr)
attr.deinitialize(count: 1)
attr.deallocate()
}
fileprivate func deinitMutex(_ ref: UnsafeMutablePointer<pthread_mutex_t>) {
pthread_mutex_destroy(ref)
ref.deinitialize(count: 1)
ref.deallocate()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment