Skip to content

Instantly share code, notes, and snippets.

@deze333
Created December 12, 2017 11:51
Show Gist options
  • Save deze333/d4cb80d9f95a5426a09125b9abd41d83 to your computer and use it in GitHub Desktop.
Save deze333/d4cb80d9f95a5426a09125b9abd41d83 to your computer and use it in GitHub Desktop.
Multi-thread to single-thread synchronized executor pattern in Swift (playground)
// Multi to single thread execution
// Credits go to https://www.reddit.com/r/swift/comments/7ijbt0/whats_a_good_way_to_make_sync_calls_from_swift_to/dr4iwxr/
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3000)) {
print("Done")
PlaygroundPage.current.finishExecution()
}
/// A mechanism that guarantees serial single-threaded execution of queued blocks.
public class WorkUnitExecutor {
/// The internal `WorkUnit` describes the block to perform and a "trap" that
/// is laid by the caller thread and triggered by the worker thread when this
/// work unit has been completed (and all preceeding queued work units).
private struct WorkUnit {
/// The block to be invoked.
var block: () -> ()
/// The caller thread semaphore to be signaled by the worker thread.
var trap = DispatchSemaphore(value: 0)
/// Create a new `WorkUnit` with an active semaphore `trap`.
init(_ block: @escaping () -> ()) {
self.block = block
}
}
/// The internal worker thread that executes serially.
private var thread: Thread!
/// The lock for accessing the queue, a shared resource.
private var lock = NSObject()
/// The queue containing all the work units to be executed by the worker thread.
/// Modifying the queue automatically causes a synchronization on `lock`.
private var queue: [WorkUnit] = [] {
willSet {
objc_sync_enter(self.lock)
}
didSet {
objc_sync_exit(self.lock)
// Automatically signal the worker thread if the queue is modified.
self.semaphore.signal()
}
}
/// The main semaphore that informs the worker thread that a new work unit has
/// been queued and needs attention.
private var semaphore = DispatchSemaphore(value: 0)
/// Construct a new `WorkUnitExecutor` with a given specific label.
/// - `label`: identifies the worker thread for debugging purposes.
public init(label: String) {
self.thread = Thread(block: self.main)
self.thread.name = label
self.thread.start()
}
/// Be sure to cancel the thread when finished.
/// FIXME: I don't think this actually stops the thread for good. :(
deinit {
self.thread.cancel()
}
/// The main function of the worker thread.
private func main() {
assert(Thread.current == self.thread, "This routine can only be invoked on the worker thread.")
while true {
// Infinitely trap and process queued work units until shut down.
// Once woken, process all queued work units and signal their owners.
// NOTE: objc_sync_* is not required since `popLast()` is mutating. See above.
while true {
//objc_sync_enter(self.lock)
if let unit = self.queue.popLast() {
//objc_sync_exit(self.lock)
unit.block()
unit.trap.signal()
} else {
//objc_sync_exit(self.lock)
break
}
}
self.semaphore.wait()
}
}
/// The auxiliary function invoked on any non-worker thread.
public func perform(_ block: @escaping () -> ()) {
assert(Thread.current != self.thread, "This routine cannot be invoked on the worker thread.")
// Push a new work unit and signal the worker thread before trapping.
// The caller thread will wait here until each preceeding work has completed.
let unit = WorkUnit(block)
self.queue.insert(unit, at: 0)
unit.trap.wait()
}
}
// Example C-style functions
func c_calcA(val: Int) -> Int {
return val * 3
}
func c_calcB(val: Int) -> Int {
return val * 5
}
/// Asynchronous caller thread
class Caller : Thread {
let executor: WorkUnitExecutor
var isRandomSleep = true
static let invocationsStarted = Date()
static var invocationsProcessed = 0
static var invocationsBenchmark = 1_000
init(executor: WorkUnitExecutor) {
self.executor = executor
let _ = Caller.invocationsStarted
}
override func main() {
for i in 1...Caller.invocationsBenchmark {
let r: Int
let verify: Int
switch arc4random_uniform(2) {
case 0:
r = calcA(val: i)
verify = c_calcA(val: i)
default:
r = calcB(val: i)
verify = c_calcB(val: i)
}
if r != verify {
print("😡 result mismatch, \(r) != \(verify)")
}
Caller.invocationsProcessed += 1
if Caller.invocationsProcessed >= Caller.invocationsBenchmark {
let dur = Date().timeIntervalSince(Caller.invocationsStarted)
let invocationDuration = dur / Double(Caller.invocationsProcessed)
print("Invocations processed : \(Caller.invocationsProcessed)")
print("Invocations duration : \(dur)")
print("Cost per invocation, sec: \(invocationDuration)")
// and quit...
PlaygroundPage.current.finishExecution()
}
}
}
func calcA(val: Int) -> Int {
var result: Int!
executor.perform {
result = c_calcA(val: val)
// Some random sleep
if self.isRandomSleep {
let t = arc4random_uniform(1000)
Thread.sleep(forTimeInterval: Double(t) * 0.001)
}
}
return result
}
func calcB(val: Int) -> Int {
var result: Int!
executor.perform {
result = c_calcB(val: val)
// Some random sleep
if self.isRandomSleep {
let t = arc4random_uniform(1000)
Thread.sleep(forTimeInterval: Double(t) * 0.001)
}
}
return result
}
}
// Try it out
let executor = WorkUnitExecutor(label: "single thread executor instance")
var callers: [Caller] = []
for _ in 0...10 {
let caller = Caller(executor: executor)
caller.isRandomSleep = false
callers.append(caller)
caller.start()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment