-
-
Save dabrahams/ea5495b4cccc2970cd56e8cfc72ca761 to your computer and use it in GitHub Desktop.
// See commentary below this gist. | |
import Foundation | |
import QuartzCore | |
// Implementation from https://talk.objc.io/episodes/S01E90-concurrent-map | |
public final class ThreadSafe<A> { | |
var _value: A | |
let queue = DispatchQueue(label: "ThreadSafe") | |
init(_ value: A) { self._value = value } | |
var value: A { | |
return queue.sync { _value } | |
} | |
func atomically(_ transform: (inout A) -> ()) { | |
queue.sync { transform(&self._value) } | |
} | |
} | |
extension Array { | |
func concurrentMap1<B>(nthreads:Int?=nil, _ transform: (Element) -> B) -> [B] { | |
let result = ThreadSafe(Array<B?>(repeating: nil, count: count)) | |
let nt = nthreads ?? count | |
let cs = (count-1)/nt+1 | |
DispatchQueue.concurrentPerform(iterations: nt) { i in | |
let min = i*cs | |
let max = min+cs>count ? count : min+cs | |
for idx in (min..<max) { | |
let element = self[idx] | |
let transformed = transform(element) | |
result.atomically { $0[idx] = transformed } | |
} | |
} | |
return result.value.map { $0! } | |
} | |
} | |
// My generic implementation | |
extension RandomAccessCollection { | |
/// Returns `self.map(transform)`, computed in parallel. | |
/// | |
/// - Requires: `transform` is safe to call from multiple threads. | |
func concurrentMap2<B>(minBatchSize: Int = 4096, _ transform: (Element) -> B) -> [B] { | |
precondition(minBatchSize >= 1) | |
let n = self.count | |
let batchCount = (n + minBatchSize - 1) / minBatchSize | |
if batchCount < 2 { return self.map(transform) } | |
return Array(unsafeUninitializedCapacity: n) { | |
uninitializedMemory, resultCount in | |
resultCount = n | |
let baseAddress = uninitializedMemory.baseAddress! | |
DispatchQueue.concurrentPerform(iterations: batchCount) { b in | |
let startOffset = b * n / batchCount | |
let endOffset = (b + 1) * n / batchCount | |
var sourceIndex = index(self.startIndex, offsetBy: startOffset) | |
for p in baseAddress+startOffset..<baseAddress+endOffset { | |
p.initialize(to: transform(self[sourceIndex])) | |
formIndex(after: &sourceIndex) | |
} | |
} | |
} | |
} | |
} | |
// This oughta be an optimization, but doesn't seem to be! | |
extension Array { | |
/// Returns `self.map(transform)`, computed in parallel. | |
/// | |
/// - Requires: `transform` is safe to call from multiple threads. | |
func concurrentMap3<B>(_ transform: (Element) -> B) -> [B] { | |
withUnsafeBufferPointer { $0.concurrentMap2(transform) } | |
} | |
} | |
// Implementation with no unsafe constructs. | |
extension RandomAccessCollection { | |
/// Returns `self.map(transform)`, computed in parallel. | |
/// | |
/// - Requires: `transform` is safe to call from multiple threads. | |
func concurrentMap4<B>(_ transform: (Element) -> B) -> [B] { | |
let batchSize = 4096 // Tune this | |
let n = self.count | |
let batchCount = (n + batchSize - 1) / batchSize | |
if batchCount < 2 { return self.map(transform) } | |
var batches = ThreadSafe( | |
ContiguousArray<[B]?>(repeating: nil, count: batchCount)) | |
func batchStart(_ b: Int) -> Index { | |
index(startIndex, offsetBy: b * n / batchCount) | |
} | |
DispatchQueue.concurrentPerform(iterations: batchCount) { b in | |
let batch = self[batchStart(b)..<batchStart(b + 1)].map(transform) | |
batches.atomically { $0[b] = batch } | |
} | |
return batches.value.flatMap { $0! } | |
} | |
} | |
func test(count: Int, _ transform: (Int)->Int) { | |
let hugeCollection = 0...655360 | |
let hugeArray = Array(hugeCollection) | |
func time<R>(_ f: ()->R) -> (time: Double, result: R) { | |
let startTime = CACurrentMediaTime() | |
let r = f() | |
let t = CACurrentMediaTime() - startTime | |
return (t, r) | |
} | |
let (t0, r0) = time { hugeArray.map(transform) } | |
print("sequential map time:", t0, "(the one to beat)") | |
let (t1, r1) = time { hugeArray.concurrentMap1(transform) } | |
print("concurrentMap1 time:", t1) | |
let (t2, r2) = time { hugeArray.concurrentMap2(transform) } | |
print("concurrentMap2 time:", t2) | |
let (t3, r3) = time { hugeArray.concurrentMap3(transform) } | |
print("concurrentMap3 time:", t3) | |
let (t4, r4) = time { hugeArray.concurrentMap4(transform) } | |
print("concurrentMap4 time:", t4) | |
if r1 != r0 { fatalError("bad implementation 1") } | |
if r2 != r0 { fatalError("bad implementation 2") } | |
if r3 != r0 { fatalError("bad implementation 3") } | |
if r4 != r0 { fatalError("bad implementation 4") } | |
} | |
let N = 65536 | |
print("* Testing a fast operation, to show overhead") | |
test(count: N * 10) { $0 &+ 1 } | |
print() | |
print("* Testing slow operations") | |
let bigArray = Array(0...N) | |
for shift in 0..<5 { | |
let M = (N >> 4) << shift | |
let workload = bigArray.prefix(M) | |
print("- worklaod size: ", workload.count) | |
test(count: N) { workload.reduce($0, &+) } | |
} |
Hi @dabrahams!
Thanks for great implementations and benchmarking!
I wonder, if you happen to have an advice for using those asynchronously. In my case, I need concurrent processing for large sets of data, but I also want interface to stay responsive. My current(very rough) implementation uses concurrentMap2
in the following way:
final class CancellationToken {
var cancelled: Bool = false
func cancel() {
cancelled = true
}
}
extension RandomAccessCollection {
/// Returns `self.map(transform)`, computed in parallel.
///
/// - Requires: `transform` is safe to call from multiple threads.
func concurrentMap<B>(minBatchSize: Int = 4096, token: CancellationToken = .init(), _ transform: (Element) -> B) -> [B] {
precondition(minBatchSize >= 1)
let n = self.count
let batchCount = (n + minBatchSize - 1) / minBatchSize
if batchCount < 2 { return self.map(transform) }
return Array(unsafeUninitializedCapacity: n) {
uninitializedMemory, resultCount in
resultCount = n
let baseAddress = uninitializedMemory.baseAddress!
DispatchQueue.concurrentPerform(iterations: batchCount) { b in
guard !token.cancelled else {
return
}
let startOffset = b * n / batchCount
let endOffset = (b + 1) * n / batchCount
var sourceIndex = index(self.startIndex, offsetBy: startOffset)
for p in baseAddress+startOffset..<baseAddress+endOffset {
p.initialize(to: transform(self[sourceIndex]))
formIndex(after: &sourceIndex)
}
}
}
}
func asyncConcurrentMap<T>(qos: DispatchQoS.QoSClass = .userInitiated, token: CancellationToken, transform: @escaping (Element) -> T, completion: @escaping ([T], CancellationToken) -> ()) {
DispatchQueue.global(qos: qos).async {
let result = self.concurrentMap(token: token, transform)
DispatchQueue.main.async {
completion(result, token)
}
}
}
}
It works fine in my case, however I'm pretty sure I'm walking in very dangerous territory, since Array memory is not fully initialized, because of cancellation. Documentation clearly states that The memory in the range buffer[0..<initializedCount] must be initialized at the end of the closure's execution
, but I'm not sure how to cancel this operation differently.
Would love to hear any of your thoughts on this!
Update: it turns out, my implementation was not actually cancelling anything, and when I did, everything crashes, as expected ) So far, the only reasonable thing I came up with, is initializing with bogus values instead of running expensive transform operation:
extension RandomAccessCollection {
/// Returns `self.map(transform)`, computed in parallel.
///
/// - Requires: `transform` is safe to call from multiple threads.
func concurrentMap<B>(minBatchSize: Int = 4096, token: CancellationToken = .init(), bogusValue: B, _ transform: (Element) -> B) -> [B] {
precondition(minBatchSize >= 1)
let n = self.count
let batchCount = (n + minBatchSize - 1) / minBatchSize
if batchCount < 2 { return self.map(transform) }
return Array(unsafeUninitializedCapacity: n) {
uninitializedMemory, resultCount in
resultCount = n
let baseAddress = uninitializedMemory.baseAddress!
DispatchQueue.concurrentPerform(iterations: batchCount) { b in
let startOffset = b * n / batchCount
let endOffset = (b + 1) * n / batchCount
var sourceIndex = index(self.startIndex, offsetBy: startOffset)
for p in baseAddress+startOffset..<baseAddress+endOffset {
if token.cancelled {
p.initialize(to: bogusValue)
} else {
p.initialize(to: transform(self[sourceIndex]))
}
formIndex(after: &sourceIndex)
}
}
}
}
func asyncConcurrentMap<T>(qos: DispatchQoS.QoSClass = .userInitiated, token: CancellationToken, bogusValue: T, transform: @escaping (Element) -> T, completion: @escaping ([T], CancellationToken) -> ()) {
DispatchQueue.global(qos: qos).async {
let result = self.concurrentMap(token: token, bogusValue: bogusValue, transform)
DispatchQueue.main.async {
completion(result, token)
}
}
}
}
Not a fan of how this API reads, but at least it's safe, doesn't crash and actually improves performance by not running transform
operation.
Sorry @DenTelezhkin ; I missed your remarks when you posted them.
That's fine =) Thanks for provided solutions anyway. The project this was needed in was finished long time ago, and my last implementation seemed like it was working pretty well.
These results show that:
UnsafeBufferPointer
than it is to use a generic method directly on an array.