Created
December 12, 2017 11:51
-
-
Save deze333/d4cb80d9f95a5426a09125b9abd41d83 to your computer and use it in GitHub Desktop.
Multi-thread to single-thread synchronized executor pattern in Swift (playground)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Multi to single thread execution | |
// Credits go to https://www.reddit.com/r/swift/comments/7ijbt0/whats_a_good_way_to_make_sync_calls_from_swift_to/dr4iwxr/ | |
import Foundation | |
import PlaygroundSupport | |
PlaygroundPage.current.needsIndefiniteExecution = true | |
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3000)) { | |
print("Done") | |
PlaygroundPage.current.finishExecution() | |
} | |
/// A mechanism that guarantees serial single-threaded execution of queued blocks. | |
public class WorkUnitExecutor { | |
/// The internal `WorkUnit` describes the block to perform and a "trap" that | |
/// is laid by the caller thread and triggered by the worker thread when this | |
/// work unit has been completed (and all preceeding queued work units). | |
private struct WorkUnit { | |
/// The block to be invoked. | |
var block: () -> () | |
/// The caller thread semaphore to be signaled by the worker thread. | |
var trap = DispatchSemaphore(value: 0) | |
/// Create a new `WorkUnit` with an active semaphore `trap`. | |
init(_ block: @escaping () -> ()) { | |
self.block = block | |
} | |
} | |
/// The internal worker thread that executes serially. | |
private var thread: Thread! | |
/// The lock for accessing the queue, a shared resource. | |
private var lock = NSObject() | |
/// The queue containing all the work units to be executed by the worker thread. | |
/// Modifying the queue automatically causes a synchronization on `lock`. | |
private var queue: [WorkUnit] = [] { | |
willSet { | |
objc_sync_enter(self.lock) | |
} | |
didSet { | |
objc_sync_exit(self.lock) | |
// Automatically signal the worker thread if the queue is modified. | |
self.semaphore.signal() | |
} | |
} | |
/// The main semaphore that informs the worker thread that a new work unit has | |
/// been queued and needs attention. | |
private var semaphore = DispatchSemaphore(value: 0) | |
/// Construct a new `WorkUnitExecutor` with a given specific label. | |
/// - `label`: identifies the worker thread for debugging purposes. | |
public init(label: String) { | |
self.thread = Thread(block: self.main) | |
self.thread.name = label | |
self.thread.start() | |
} | |
/// Be sure to cancel the thread when finished. | |
/// FIXME: I don't think this actually stops the thread for good. :( | |
deinit { | |
self.thread.cancel() | |
} | |
/// The main function of the worker thread. | |
private func main() { | |
assert(Thread.current == self.thread, "This routine can only be invoked on the worker thread.") | |
while true { | |
// Infinitely trap and process queued work units until shut down. | |
// Once woken, process all queued work units and signal their owners. | |
// NOTE: objc_sync_* is not required since `popLast()` is mutating. See above. | |
while true { | |
//objc_sync_enter(self.lock) | |
if let unit = self.queue.popLast() { | |
//objc_sync_exit(self.lock) | |
unit.block() | |
unit.trap.signal() | |
} else { | |
//objc_sync_exit(self.lock) | |
break | |
} | |
} | |
self.semaphore.wait() | |
} | |
} | |
/// The auxiliary function invoked on any non-worker thread. | |
public func perform(_ block: @escaping () -> ()) { | |
assert(Thread.current != self.thread, "This routine cannot be invoked on the worker thread.") | |
// Push a new work unit and signal the worker thread before trapping. | |
// The caller thread will wait here until each preceeding work has completed. | |
let unit = WorkUnit(block) | |
self.queue.insert(unit, at: 0) | |
unit.trap.wait() | |
} | |
} | |
// Example C-style functions | |
func c_calcA(val: Int) -> Int { | |
return val * 3 | |
} | |
func c_calcB(val: Int) -> Int { | |
return val * 5 | |
} | |
/// Asynchronous caller thread | |
class Caller : Thread { | |
let executor: WorkUnitExecutor | |
var isRandomSleep = true | |
static let invocationsStarted = Date() | |
static var invocationsProcessed = 0 | |
static var invocationsBenchmark = 1_000 | |
init(executor: WorkUnitExecutor) { | |
self.executor = executor | |
let _ = Caller.invocationsStarted | |
} | |
override func main() { | |
for i in 1...Caller.invocationsBenchmark { | |
let r: Int | |
let verify: Int | |
switch arc4random_uniform(2) { | |
case 0: | |
r = calcA(val: i) | |
verify = c_calcA(val: i) | |
default: | |
r = calcB(val: i) | |
verify = c_calcB(val: i) | |
} | |
if r != verify { | |
print("😡 result mismatch, \(r) != \(verify)") | |
} | |
Caller.invocationsProcessed += 1 | |
if Caller.invocationsProcessed >= Caller.invocationsBenchmark { | |
let dur = Date().timeIntervalSince(Caller.invocationsStarted) | |
let invocationDuration = dur / Double(Caller.invocationsProcessed) | |
print("Invocations processed : \(Caller.invocationsProcessed)") | |
print("Invocations duration : \(dur)") | |
print("Cost per invocation, sec: \(invocationDuration)") | |
// and quit... | |
PlaygroundPage.current.finishExecution() | |
} | |
} | |
} | |
func calcA(val: Int) -> Int { | |
var result: Int! | |
executor.perform { | |
result = c_calcA(val: val) | |
// Some random sleep | |
if self.isRandomSleep { | |
let t = arc4random_uniform(1000) | |
Thread.sleep(forTimeInterval: Double(t) * 0.001) | |
} | |
} | |
return result | |
} | |
func calcB(val: Int) -> Int { | |
var result: Int! | |
executor.perform { | |
result = c_calcB(val: val) | |
// Some random sleep | |
if self.isRandomSleep { | |
let t = arc4random_uniform(1000) | |
Thread.sleep(forTimeInterval: Double(t) * 0.001) | |
} | |
} | |
return result | |
} | |
} | |
// Try it out | |
let executor = WorkUnitExecutor(label: "single thread executor instance") | |
var callers: [Caller] = [] | |
for _ in 0...10 { | |
let caller = Caller(executor: executor) | |
caller.isRandomSleep = false | |
callers.append(caller) | |
caller.start() | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment