Last active
          April 3, 2024 04:59 
        
      - 
      
- 
        Save dabrahams/ea5495b4cccc2970cd56e8cfc72ca761 to your computer and use it in GitHub Desktop. 
    Concurrent Map Implementations, Benchmarked
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | // See commentary below this gist. | |
| import Foundation | |
| import QuartzCore | |
| // Implementation from https://talk.objc.io/episodes/S01E90-concurrent-map | |
| public final class ThreadSafe<A> { | |
| var _value: A | |
| let queue = DispatchQueue(label: "ThreadSafe") | |
| init(_ value: A) { self._value = value } | |
| var value: A { | |
| return queue.sync { _value } | |
| } | |
| func atomically(_ transform: (inout A) -> ()) { | |
| queue.sync { transform(&self._value) } | |
| } | |
| } | |
| extension Array { | |
| func concurrentMap1<B>(nthreads:Int?=nil, _ transform: (Element) -> B) -> [B] { | |
| let result = ThreadSafe(Array<B?>(repeating: nil, count: count)) | |
| let nt = nthreads ?? count | |
| let cs = (count-1)/nt+1 | |
| DispatchQueue.concurrentPerform(iterations: nt) { i in | |
| let min = i*cs | |
| let max = min+cs>count ? count : min+cs | |
| for idx in (min..<max) { | |
| let element = self[idx] | |
| let transformed = transform(element) | |
| result.atomically { $0[idx] = transformed } | |
| } | |
| } | |
| return result.value.map { $0! } | |
| } | |
| } | |
| // My generic implementation | |
| extension RandomAccessCollection { | |
| /// Returns `self.map(transform)`, computed in parallel. | |
| /// | |
| /// - Requires: `transform` is safe to call from multiple threads. | |
| func concurrentMap2<B>(minBatchSize: Int = 4096, _ transform: (Element) -> B) -> [B] { | |
| precondition(minBatchSize >= 1) | |
| let n = self.count | |
| let batchCount = (n + minBatchSize - 1) / minBatchSize | |
| if batchCount < 2 { return self.map(transform) } | |
| return Array(unsafeUninitializedCapacity: n) { | |
| uninitializedMemory, resultCount in | |
| resultCount = n | |
| let baseAddress = uninitializedMemory.baseAddress! | |
| DispatchQueue.concurrentPerform(iterations: batchCount) { b in | |
| let startOffset = b * n / batchCount | |
| let endOffset = (b + 1) * n / batchCount | |
| var sourceIndex = index(self.startIndex, offsetBy: startOffset) | |
| for p in baseAddress+startOffset..<baseAddress+endOffset { | |
| p.initialize(to: transform(self[sourceIndex])) | |
| formIndex(after: &sourceIndex) | |
| } | |
| } | |
| } | |
| } | |
| } | |
| // This oughta be an optimization, but doesn't seem to be! | |
| extension Array { | |
| /// Returns `self.map(transform)`, computed in parallel. | |
| /// | |
| /// - Requires: `transform` is safe to call from multiple threads. | |
| func concurrentMap3<B>(_ transform: (Element) -> B) -> [B] { | |
| withUnsafeBufferPointer { $0.concurrentMap2(transform) } | |
| } | |
| } | |
| // Implementation with no unsafe constructs. | |
| extension RandomAccessCollection { | |
| /// Returns `self.map(transform)`, computed in parallel. | |
| /// | |
| /// - Requires: `transform` is safe to call from multiple threads. | |
| func concurrentMap4<B>(_ transform: (Element) -> B) -> [B] { | |
| let batchSize = 4096 // Tune this | |
| let n = self.count | |
| let batchCount = (n + batchSize - 1) / batchSize | |
| if batchCount < 2 { return self.map(transform) } | |
| var batches = ThreadSafe( | |
| ContiguousArray<[B]?>(repeating: nil, count: batchCount)) | |
| func batchStart(_ b: Int) -> Index { | |
| index(startIndex, offsetBy: b * n / batchCount) | |
| } | |
| DispatchQueue.concurrentPerform(iterations: batchCount) { b in | |
| let batch = self[batchStart(b)..<batchStart(b + 1)].map(transform) | |
| batches.atomically { $0[b] = batch } | |
| } | |
| return batches.value.flatMap { $0! } | |
| } | |
| } | |
| func test(count: Int, _ transform: (Int)->Int) { | |
| let hugeCollection = 0...655360 | |
| let hugeArray = Array(hugeCollection) | |
| func time<R>(_ f: ()->R) -> (time: Double, result: R) { | |
| let startTime = CACurrentMediaTime() | |
| let r = f() | |
| let t = CACurrentMediaTime() - startTime | |
| return (t, r) | |
| } | |
| let (t0, r0) = time { hugeArray.map(transform) } | |
| print("sequential map time:", t0, "(the one to beat)") | |
| let (t1, r1) = time { hugeArray.concurrentMap1(transform) } | |
| print("concurrentMap1 time:", t1) | |
| let (t2, r2) = time { hugeArray.concurrentMap2(transform) } | |
| print("concurrentMap2 time:", t2) | |
| let (t3, r3) = time { hugeArray.concurrentMap3(transform) } | |
| print("concurrentMap3 time:", t3) | |
| let (t4, r4) = time { hugeArray.concurrentMap4(transform) } | |
| print("concurrentMap4 time:", t4) | |
| if r1 != r0 { fatalError("bad implementation 1") } | |
| if r2 != r0 { fatalError("bad implementation 2") } | |
| if r3 != r0 { fatalError("bad implementation 3") } | |
| if r4 != r0 { fatalError("bad implementation 4") } | |
| } | |
| let N = 65536 | |
| print("* Testing a fast operation, to show overhead") | |
| test(count: N * 10) { $0 &+ 1 } | |
| print() | |
| print("* Testing slow operations") | |
| let bigArray = Array(0...N) | |
| for shift in 0..<5 { | |
| let M = (N >> 4) << shift | |
| let workload = bigArray.prefix(M) | |
| print("- worklaod size: ", workload.count) | |
| test(count: N) { workload.reduce($0, &+) } | |
| } | 
Update: it turns out, my implementation was not actually cancelling anything, and when I did, everything crashes, as expected ) So far, the only reasonable thing I came up with, is initializing with bogus values instead of running expensive transform operation:
extension RandomAccessCollection {
    /// Returns `self.map(transform)`, computed in parallel.
    ///
    /// - Requires: `transform` is safe to call from multiple threads.
    func concurrentMap<B>(minBatchSize: Int = 4096, token: CancellationToken = .init(), bogusValue: B, _ transform: (Element) -> B) -> [B] {
        precondition(minBatchSize >= 1)
        let n = self.count
        let batchCount = (n + minBatchSize - 1) / minBatchSize
        if batchCount < 2 { return self.map(transform) }
        
        return Array(unsafeUninitializedCapacity: n) {
            uninitializedMemory, resultCount in
            resultCount = n
            let baseAddress = uninitializedMemory.baseAddress!
            
            DispatchQueue.concurrentPerform(iterations: batchCount) { b in
                let startOffset = b * n / batchCount
                let endOffset = (b + 1) * n / batchCount
                var sourceIndex = index(self.startIndex, offsetBy: startOffset)
                for p in baseAddress+startOffset..<baseAddress+endOffset {
                    if token.cancelled {
                        p.initialize(to: bogusValue)
                    } else {
                        p.initialize(to: transform(self[sourceIndex]))
                    }
                    formIndex(after: &sourceIndex)
                }
            }
        }
    }
    func asyncConcurrentMap<T>(qos: DispatchQoS.QoSClass = .userInitiated, token: CancellationToken, bogusValue: T, transform: @escaping (Element) -> T, completion: @escaping ([T], CancellationToken) -> ()) {
        DispatchQueue.global(qos: qos).async {
            let result = self.concurrentMap(token: token, bogusValue: bogusValue, transform)
            
            DispatchQueue.main.async {
                completion(result, token)
            }
        }
    }
}Not a fan of how this API reads, but at least it's safe, doesn't crash and actually improves performance by not running transform operation.
Sorry @DenTelezhkin ; I missed your remarks when you posted them.
That's fine =) Thanks for provided solutions anyway. The project this was needed in was finished long time ago, and my last implementation seemed like it was working pretty well.
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment
  
            
Hi @dabrahams!
Thanks for great implementations and benchmarking!
I wonder, if you happen to have an advice for using those asynchronously. In my case, I need concurrent processing for large sets of data, but I also want interface to stay responsive. My current(very rough) implementation uses
concurrentMap2in the following way:It works fine in my case, however I'm pretty sure I'm walking in very dangerous territory, since Array memory is not fully initialized, because of cancellation. Documentation clearly states that
The memory in the range buffer[0..<initializedCount] must be initialized at the end of the closure's execution, but I'm not sure how to cancel this operation differently.Would love to hear any of your thoughts on this!