Skip to content

Instantly share code, notes, and snippets.

@marciniwanicki
Last active July 24, 2019 23:42
Show Gist options
  • Save marciniwanicki/595e94ed9e0b6af436a5656f533a48b6 to your computer and use it in GitHub Desktop.
Save marciniwanicki/595e94ed9e0b6af436a5656f533a48b6 to your computer and use it in GitHub Desktop.
import Foundation
public class ConcurrentQueueExecutor<T> {
private let serialQueue = DispatchQueue(label: "SerialQueue")
private let concurrentQueue = DispatchQueue(label: "ConcurrentQueue", attributes: .concurrent)
private let context: ExecutorContext
public init() {
context = ExecutorContext(serialQueue: serialQueue)
}
public func iterate(_ array: [T], closure: @escaping (T, ExecutorContext) throws -> Void) {
array.forEach { element in
execute { context in try closure(element, context) }
}
}
public func execute(closure: @escaping (ExecutorContext) throws -> Void) {
concurrentQueue.async {
do {
try closure(self.context)
} catch {
self.context.error(error)
}
}
}
public func wait() throws {
try concurrentQueue.sync(flags: .barrier) {
try context.throwIfAnyError()
}
}
// MARK: - Private
private func createSync(_ closure: @escaping () -> Void) -> () -> Void {
return {
self.serialQueue.sync(execute: closure)
}
}
}
public class ExecutorContext {
private let serialQueue: DispatchQueue
private var errors: [Error] = []
init(serialQueue: DispatchQueue) {
self.serialQueue = serialQueue
}
public func sync(_ closure: @escaping () -> Void) {
serialQueue.sync(execute: closure)
}
public func sync<T>(_ closure: @escaping () -> T) -> T {
return serialQueue.sync(execute: closure)
}
func error(_ error: Error) {
sync { self.errors.append(error) }
}
func throwIfAnyError() throws -> Void {
let error: Error? = sync {
let first = self.errors.first
self.errors.removeAll()
return first
}
if let error = error {
throw error
}
}
}
public extension Array {
func concurrentMap<T>(_ closure: @escaping (Element) throws -> T) throws -> [T] {
let executor = ConcurrentQueueExecutor<Element>()
var results = [T]()
executor.iterate(self) { (element, context) in
let result = try closure(element)
context.sync { results.append(result) }
}
try executor.wait()
return results
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment