Last active
July 16, 2020 11:00
-
-
Save karwa/43ae838809cc68d317003f2885c71572 to your computer and use it in GitHub Desktop.
Concurrent collection wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Dispatch | |
// Import C11 for atomic_flag | |
// FIXME: SWIFT(canImport) | |
//#if canImport(Glibc) | |
// import Glibc.stdatomic | |
//#elseif canImport(Darwin) | |
import Darwin.C.stdatomic | |
//#endif | |
extension RandomAccessCollection { | |
/// Returns a view of this collection with concurrent operations | |
/// such as 'forEach', 'map', 'filter', etc. | |
/// | |
public var concurrent: ConcurrentCollection<Self> { | |
return ConcurrentCollection(viewing: self) | |
} | |
} | |
public struct ConcurrentCollection<C>: RandomAccessCollection where C: RandomAccessCollection { | |
@_versioned let wrapped: C | |
@_transparent | |
public init(viewing wrapped: C) { self.wrapped = wrapped } | |
// Indexes | |
@_transparent | |
public var startIndex: C.Index { return wrapped.startIndex } | |
@_transparent | |
public var endIndex: C.Index { return wrapped.endIndex } | |
@_transparent | |
public var count: C.IndexDistance { return wrapped.count } | |
@_transparent | |
public func index(before i: C.Index) -> C.Index { | |
return wrapped.index(before: i) | |
} | |
@_transparent | |
public func index(after i: C.Index) -> C.Index { | |
return wrapped.index(after: i) | |
} | |
@_transparent | |
public func index(_ i: C.Index, offsetBy n: C.IndexDistance) -> C.Index { | |
return wrapped.index(i, offsetBy: n) | |
} | |
@_transparent | |
public func index(_ i: C.Index, offsetBy n: C.IndexDistance, limitedBy limit: C.Index) -> C.Index? { | |
return wrapped.index(i, offsetBy: n, limitedBy: limit) | |
} | |
// Iterator | |
@_transparent | |
public func makeIterator() -> C.Iterator { | |
return wrapped.makeIterator() | |
} | |
// Subscript | |
public subscript(index: C.Index) -> C.Iterator.Element { | |
@_transparent | |
get { return wrapped[index] } | |
} | |
} | |
// Primitive concurrent operations. | |
extension ConcurrentCollection { | |
// This is basically some compiler hackery exploiting a flawed implementation | |
// of 'rethrows'. But hey, it's what DispatchQueue.sync does internally... | |
/// Concurrently executes the body with the integers from 0..<count. | |
/// | |
/// If multiple errors are thrown, the one which gets re-thrown is not | |
/// necessarily guaranteed to be the 'first' or 'last' one. | |
/// | |
@inline(__always) | |
fileprivate func _forEach(_ body: (Int) throws -> Void, rethrowHack: (Error)throws->Never) rethrows { | |
var result = Error?.none | |
var hasResult = atomic_flag() | |
DispatchQueue.concurrentPerform(iterations: numericCast(count)) { | |
guard result == nil else { return } | |
do { try body($0) } | |
catch { if atomic_flag_test_and_set(&hasResult) == false { result = error } } | |
} | |
if let error = result { try rethrowHack(error) } | |
} | |
/// Concurrently executes the block with the integers from 0..<count. | |
/// | |
/// If multiple errors are thrown, the one which gets re-thrown is not | |
/// necessarily guaranteed to be the 'first' or 'last' one. | |
/// | |
/// Execution happens in batches. | |
/// | |
@inline(__always) | |
fileprivate func _batchedForEach<T>(_ body: (Int)throws->T?, rethrowHack: (Error)throws->Never) rethrows -> T? { | |
let batchSize = 32 | |
var start = 0 | |
var end = Swift.min(start + batchSize, numericCast(count)) | |
// If empty, skip processing. | |
guard start != end else { return .none } | |
var result = _FailableResult<T>?.none | |
var hasResult = atomic_flag() | |
repeat { | |
// Concurrently execute over the slice. | |
DispatchQueue.concurrentPerform(iterations: end - start) { | |
do { | |
if let val = try body(start + $0), atomic_flag_test_and_set(&hasResult) == false { | |
result = .result(val) | |
} | |
} | |
catch { | |
if atomic_flag_test_and_set(&hasResult) == false { | |
result = .error(error) | |
} | |
} | |
} | |
// If we have a result, return/rethrow it. | |
guard atomic_flag_test_and_set(&hasResult) == false else { | |
switch result! { | |
case .result(let r): return r | |
case .error(let e): try rethrowHack(e) | |
} | |
} | |
// Clear the flag (we just set-it-to-check-it), and advance the slice. | |
atomic_flag_clear(&hasResult) | |
start = end | |
end = Swift.min(start + batchSize, numericCast(count)) | |
} while start != end | |
// Closure never returned a value, did not throw. | |
return .none | |
} | |
private enum _FailableResult<Result> { | |
case result(Result) | |
case error(Error) | |
} | |
} | |
// Concurrent algorithm implementations. | |
extension ConcurrentCollection { | |
// Executes the body concurrently for every item. | |
public func forEach(_ body: (C.Iterator.Element) throws -> Void) rethrows { | |
try _forEach({ | |
let idx = index(startIndex, offsetBy: numericCast($0)) | |
try body(self[idx]) | |
}, rethrowHack: { throw $0 }) | |
} | |
// Executes the __non-throwing__ transform concurrently, stores the result in to an Array. | |
public func map<T>(_ transform: (C.Iterator.Element) -> T) -> [T] { | |
let n = numericCast(count) as Int | |
let buffer = UnsafeMutablePointer<T>.allocate(capacity: n) | |
defer { buffer.deinitialize(count: n); buffer.deallocate(capacity: n) } | |
_forEach({ let idx = index(startIndex, offsetBy: numericCast($0)) | |
(buffer + $0).initialize(to: transform(self[idx])) | |
}, rethrowHack: { _ in preconditionFailure() }) | |
// FIXME: SWIFT(unsafe-array-init): | |
// Unforunately, there is no way to unsafely initialise the contents of an Array<T>, | |
// so we have to intialise an UnsafeMutablePointer and copy the contents in to an Array. | |
// https://bugs.swift.org/browse/SR-3087 | |
return Array(UnsafeMutableBufferPointer<T>(start: buffer, count: n)) | |
} | |
// Executes the __throwing__ transform concurrently, stores the result in to an Array. | |
public func map<T>(_ transform: (C.Iterator.Element) throws -> T) rethrows -> [T] { | |
let n = numericCast(count) as Int | |
// Since the closure may throw and we're not processing linearly, | |
// _any_ elements may be left uninitialised in the event of an error. | |
// | |
// Thus, we allocate a raw-buffer, initialise it to Optional<T>.none | |
// (which we can still deinit safely if closure throws), and adjust the layout later. | |
let rawBuffer = UnsafeMutableRawPointer.allocate(bytes: n * MemoryLayout<Optional<T>>.stride, | |
alignedTo: MemoryLayout<Optional<T>>.alignment) | |
defer { rawBuffer.deallocate(bytes: n * MemoryLayout<Optional<T>>.stride, | |
alignedTo: MemoryLayout<Optional<T>>.alignment) } | |
// Initialise the buffer to nil. | |
let optBuffer = rawBuffer.bindMemory(to: Optional<T>.self, capacity: n) | |
optBuffer.initialize(to: .none, count: n) | |
// Concurrently execute the block. | |
// If it throws, deinitialise everything and rethrow. | |
try _forEach({ let idx = index(startIndex, offsetBy: numericCast($0)) | |
(optBuffer + $0).initialize(to: try transform(self[idx])) | |
}, rethrowHack: { optBuffer.deinitialize(count: n); throw $0 }) | |
// Remove the optional wrapping in-place. | |
optBuffer.withMemoryRebound(to: T.self, capacity: n) { rebound in | |
for offset in 0..<n { | |
rebound.advanced(by: offset).initialize(to: optBuffer.advanced(by: offset).move()!) | |
} | |
} | |
// Re-bind the memory to T, now that it is layout-compatible. | |
let buffer = rawBuffer.bindMemory(to: T.self, capacity: n) | |
defer { buffer.deinitialize(count: n) } | |
// FIXME: SWIFT(unsafe-array-init): copies the buffer. | |
return Array(UnsafeMutableBufferPointer<T>(start: buffer, count: n)) | |
} | |
// Executes the transform concurrently, stores non-nil results in to an Array. | |
public func flatMap<T>(_ transform: (C.Iterator.Element) throws -> T?) rethrows -> [T] { | |
let n = numericCast(self.count) as Int | |
// Similarly to throwing-'map', allocate a raw-buffer, initialise with Optional<T> | |
// and adjust the layout later. | |
let rawBuffer = UnsafeMutableRawPointer.allocate(bytes: n * MemoryLayout<Optional<T>>.stride, | |
alignedTo: MemoryLayout<Optional<T>>.alignment) | |
defer { rawBuffer.deallocate(bytes: n * MemoryLayout<Optional<T>>.stride, | |
alignedTo: MemoryLayout<Optional<T>>.alignment) } | |
// Initialise the buffer to nil. | |
let optBuffer = rawBuffer.bindMemory(to: Optional<T>.self, capacity: n) | |
optBuffer.initialize(to: .none, count: n) | |
// Concurrently execute the block. If it throws, deinitialise everything. | |
try _forEach({ | |
let idx = index(startIndex, offsetBy: numericCast($0)) | |
(optBuffer + $0).initialize(to: try transform(self[idx])) | |
}, rethrowHack: { optBuffer.deinitialize(count: n); throw $0 }) | |
// Remove the optional wrapping in-place, consuming 'nil' values. | |
let count = optBuffer.withMemoryRebound(to: T.self, capacity: n) { rebound -> Int in | |
var ptr = rebound | |
(0..<n).forEach { offset in | |
guard let i = optBuffer.advanced(by: offset).move() else { return } | |
ptr.initialize(to: i) | |
ptr = ptr.advanced(by: 1) | |
} | |
return rebound.distance(to: ptr) | |
} | |
// Re-bind the memory to T, now that it is layout-compatible. | |
let buffer = rawBuffer.bindMemory(to: T.self, capacity: count) | |
defer { buffer.deinitialize(count: count) } | |
// FIXME: SWIFT(unsafe-array-init): copies the buffer. | |
return Array(UnsafeBufferPointer(start: buffer, count: count)) | |
} | |
// Executes the filter concurrently, stores affirmitive results in to an Array. | |
public func filter(_ isIncluded: (C.Iterator.Element) throws -> Bool) rethrows -> [C.Iterator.Element] { | |
return try flatMap { try isIncluded($0) ? $0 : nil } | |
} | |
// Executes the predicate concurrently, returns `true` if at least one element returned affirmitively. | |
public func contains(where predicate: (C.Iterator.Element) throws -> Bool) rethrows -> Bool { | |
return try nil != _batchedForEach({ off -> Bool? in | |
let idx = index(startIndex, offsetBy: numericCast(off)) | |
return try predicate(self[idx]) ? true : .none | |
}, rethrowHack: { throw $0 }) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment