Skip to content

Instantly share code, notes, and snippets.

@jayrhynas
Last active May 6, 2021 15:57
Show Gist options
  • Save jayrhynas/943208bba3297930475ddaebb8df9117 to your computer and use it in GitHub Desktop.
Save jayrhynas/943208bba3297930475ddaebb8df9117 to your computer and use it in GitHub Desktop.
import Foundation
class ThreadQueue {
private var mutex = pthread_mutex_t()
private var cond = pthread_cond_t()
// these are shared variables and should only be modified within a `lock` block
private var threadId: pthread_t? = nil
private var isStopped = true
private var queue: [() -> Void] = []
init() throws {
try throwError(pthread_mutex_init(&mutex, nil))
try throwError(pthread_cond_init(&cond, nil)) {
pthread_mutex_destroy(&mutex)
}
}
/// Start running main
///
/// If the thread is already started, this is a no-op
///
/// - Parameter priority: The scheduler priority to use. Pass nil to use the system default priority.
func start(priority: Int32? = 45) throws {
guard threadId == nil else {
return
}
var attr = pthread_attr_t()
try throwError(pthread_attr_init(&attr))
defer { pthread_attr_destroy(&attr) }
// disable QoS participation
try throwError(pthread_attr_setschedpolicy(&attr, SCHED_RR))
// if we have a priority, add it to the attributes
if let priority = priority {
var param = sched_param(sched_priority: priority, __opaque: (0, 0, 0, 0))
try throwError(pthread_attr_setschedparam(&attr, &param))
}
// mark as not stopped, in case we stopped earlier
isStopped = false
// create & start the thread
// since the thread function has to be compatible with C function pointers,
// we cannot capture any variables in the closure
// instead, we create a mutable reference to self then pass it as a pointer to the closure
var this = self
try throwError(pthread_create(&threadId, &attr, { ctx in
// load the context pointer as a ThreadQueue and call `main()`
ctx.load(as: ThreadQueue.self).main()
return nil
}, &this)) {
// if starting failed for some reason, mark as stopped again
isStopped = true
// set threadId to nil as it is undefined when pthread_create fails
threadId = nil
}
}
/// Stop the thread from executing. Any blocks currently in the queue will execute before stopping.
/// - Parameter waitForFinish: If true, wait for the thread to exit before returning.
func stop(waitForFinish: Bool = false) {
guard let thread = threadId else { return }
lock {
isStopped = true
threadId = nil
// signal the thread to wake up
signal()
}
if waitForFinish {
pthread_join(thread, nil)
} else {
pthread_detach(thread)
}
}
/// The main body of the thread
fileprivate func main() {
// loop as long as we're not stopped
while !isStopped {
var blocks: [() -> Void]?
// aquire the lock first
lock {
// wait for a block to appear in the queue, or for stop to be called
while queue.isEmpty && !isStopped {
wait()
}
// grab all the current blocks and clear the shared queue
blocks = queue
queue.removeAll()
}
// execute the blocks
blocks?.forEach { $0() }
}
}
/// Submits a block to the queue to be executed asynchronously on the thread
/// - Parameter block: An escaping closure specifying the work to be performed.
func async(block: @escaping () -> Void) {
lock {
queue.append(block)
// signal the thread to wake up
signal()
}
}
// MARK: - Synchronization Helpers
/// Executes `block` inside the lock
/// - Parameter block: The work to be performed while locked.
private func lock(_ block: () -> Void) {
pthread_mutex_lock(&mutex)
block()
pthread_mutex_unlock(&mutex)
}
/// Signals the thread to wake up
private func signal() {
pthread_cond_signal(&cond)
}
/// Tells the current thread to wait to be signalled
private func wait() {
pthread_cond_wait(&cond, &mutex)
}
}
/// If the passed in result is non-zero, throws the error as a POSIXError
///
/// If the result is not a valid POSIXErrorCode, EINVAL is thrown with the original result in the userInfo dictionary
private func throwError(_ result: Int32, cleanup: () -> Void = {}) throws {
do {
if let code = POSIXErrorCode(rawValue: result) {
throw POSIXError(code)
} else if result != 0 {
throw POSIXError(.EINVAL, userInfo: [NSUnderlyingErrorKey: result])
}
} catch {
cleanup()
throw error
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment