Last active
May 6, 2021 15:57
-
-
Save jayrhynas/943208bba3297930475ddaebb8df9117 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
class ThreadQueue { | |
private var mutex = pthread_mutex_t() | |
private var cond = pthread_cond_t() | |
// these are shared variables and should only be modified within a `lock` block | |
private var threadId: pthread_t? = nil | |
private var isStopped = true | |
private var queue: [() -> Void] = [] | |
init() throws { | |
try throwError(pthread_mutex_init(&mutex, nil)) | |
try throwError(pthread_cond_init(&cond, nil)) { | |
pthread_mutex_destroy(&mutex) | |
} | |
} | |
/// Start running main | |
/// | |
/// If the thread is already started, this is a no-op | |
/// | |
/// - Parameter priority: The scheduler priority to use. Pass nil to use the system default priority. | |
func start(priority: Int32? = 45) throws { | |
guard threadId == nil else { | |
return | |
} | |
var attr = pthread_attr_t() | |
try throwError(pthread_attr_init(&attr)) | |
defer { pthread_attr_destroy(&attr) } | |
// disable QoS participation | |
try throwError(pthread_attr_setschedpolicy(&attr, SCHED_RR)) | |
// if we have a priority, add it to the attributes | |
if let priority = priority { | |
var param = sched_param(sched_priority: priority, __opaque: (0, 0, 0, 0)) | |
try throwError(pthread_attr_setschedparam(&attr, ¶m)) | |
} | |
// mark as not stopped, in case we stopped earlier | |
isStopped = false | |
// create & start the thread | |
// since the thread function has to be compatible with C function pointers, | |
// we cannot capture any variables in the closure | |
// instead, we create a mutable reference to self then pass it as a pointer to the closure | |
var this = self | |
try throwError(pthread_create(&threadId, &attr, { ctx in | |
// load the context pointer as a ThreadQueue and call `main()` | |
ctx.load(as: ThreadQueue.self).main() | |
return nil | |
}, &this)) { | |
// if starting failed for some reason, mark as stopped again | |
isStopped = true | |
// set threadId to nil as it is undefined when pthread_create fails | |
threadId = nil | |
} | |
} | |
/// Stop the thread from executing. Any blocks currently in the queue will execute before stopping. | |
/// - Parameter waitForFinish: If true, wait for the thread to exit before returning. | |
func stop(waitForFinish: Bool = false) { | |
guard let thread = threadId else { return } | |
lock { | |
isStopped = true | |
threadId = nil | |
// signal the thread to wake up | |
signal() | |
} | |
if waitForFinish { | |
pthread_join(thread, nil) | |
} else { | |
pthread_detach(thread) | |
} | |
} | |
/// The main body of the thread | |
fileprivate func main() { | |
// loop as long as we're not stopped | |
while !isStopped { | |
var blocks: [() -> Void]? | |
// aquire the lock first | |
lock { | |
// wait for a block to appear in the queue, or for stop to be called | |
while queue.isEmpty && !isStopped { | |
wait() | |
} | |
// grab all the current blocks and clear the shared queue | |
blocks = queue | |
queue.removeAll() | |
} | |
// execute the blocks | |
blocks?.forEach { $0() } | |
} | |
} | |
/// Submits a block to the queue to be executed asynchronously on the thread | |
/// - Parameter block: An escaping closure specifying the work to be performed. | |
func async(block: @escaping () -> Void) { | |
lock { | |
queue.append(block) | |
// signal the thread to wake up | |
signal() | |
} | |
} | |
// MARK: - Synchronization Helpers | |
/// Executes `block` inside the lock | |
/// - Parameter block: The work to be performed while locked. | |
private func lock(_ block: () -> Void) { | |
pthread_mutex_lock(&mutex) | |
block() | |
pthread_mutex_unlock(&mutex) | |
} | |
/// Signals the thread to wake up | |
private func signal() { | |
pthread_cond_signal(&cond) | |
} | |
/// Tells the current thread to wait to be signalled | |
private func wait() { | |
pthread_cond_wait(&cond, &mutex) | |
} | |
} | |
/// If the passed in result is non-zero, throws the error as a POSIXError | |
/// | |
/// If the result is not a valid POSIXErrorCode, EINVAL is thrown with the original result in the userInfo dictionary | |
private func throwError(_ result: Int32, cleanup: () -> Void = {}) throws { | |
do { | |
if let code = POSIXErrorCode(rawValue: result) { | |
throw POSIXError(code) | |
} else if result != 0 { | |
throw POSIXError(.EINVAL, userInfo: [NSUnderlyingErrorKey: result]) | |
} | |
} catch { | |
cleanup() | |
throw error | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment