Last active
February 21, 2021 10:06
-
-
Save Goheeca/d91e62fe50fcd798a8176fe6a38aca3b to your computer and use it in GitHub Desktop.
Pausable Threads
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/sbcl --script | |
(load "~/.sbclrc") | |
(ql:quickload '("bordeaux-threads") :silent t) | |
(defun color-formatter (color) | |
(format nil "~a[~am~~?~a[m" #\Esc color #\Esc)) | |
(defvar *output-lock* (bt:make-lock "OUTPUT LOCK")) | |
(defmacro safe-format (stream fmt &rest args) | |
`(bt:with-lock-held (*output-lock*) | |
(format ,stream ,fmt ,@args) | |
(force-output))) | |
(defvar *progress-monitor* nil) | |
(defvar *current-pausable-thread* nil) | |
(defclass synchronized-value () | |
((sv-name | |
:initarg :name | |
:accessor sv-name) | |
(sv-value | |
:initarg :value | |
:accessor sv-value) | |
(sv-guard-lock | |
:initform nil | |
:accessor sv-guard-lock))) | |
(defmethod initialize-instance :after ((sv synchronized-value) &key) | |
(setf (sv-guard-lock sv) (bt:make-recursive-lock (concatenate 'string "guard lock of " (sv-name sv))))) | |
(defun make-synchronized-value (value name) | |
(make-instance 'synchronized-value :value value :name name)) | |
(defmacro with-synchronized-value ((sv) &body body) | |
`(bt:with-recursive-lock-held ((sv-guard-lock ,sv)) | |
,@body)) | |
(defclass pausable-thread () | |
((name | |
:initarg :name | |
:accessor name) | |
(function | |
:initarg :thread-function | |
:accessor thread-function) | |
(thread | |
:initform nil | |
:accessor thread) | |
(thread-lock | |
:initform nil | |
:accessor thread-lock) | |
(signal-lock | |
:initform nil | |
:accessor signal-lock) | |
(paused | |
:initform nil | |
:accessor paused) | |
(pausable | |
:initform t | |
:accessor pausable) | |
(pausable-condition | |
:initform nil | |
:accessor pausable-condition) | |
(pausing | |
:initform nil | |
:accessor pausing) | |
(pausing-condition | |
:initform nil | |
:accessor pausing-condition) | |
(unpausing | |
:initform nil | |
:accessor unpausing) | |
(unpausing-condition | |
:initform nil | |
:accessor unpausing-condition) | |
(color | |
:initarg :color | |
:initform "" | |
:accessor color))) | |
(defmethod initialize-instance :after ((pt pausable-thread) &key) | |
(setf (thread-lock pt) (bt:make-recursive-lock (concatenate 'string "thread lock of " (name pt)))) | |
(setf (signal-lock pt) (bt:make-lock (concatenate 'string "signal lock of " (name pt)))) | |
(setf (pausable-condition pt) | |
(bt:make-condition-variable :name (concatenate 'string "pausable condition of " (name pt)))) | |
(setf (pausing-condition pt) | |
(bt:make-condition-variable :name (concatenate 'string "pausing condition of " (name pt)))) | |
(setf (unpausing-condition pt) | |
(bt:make-condition-variable :name (concatenate 'string "unpausing condition of " (name pt)))) | |
(setf (thread pt) (bt:make-thread #'(lambda () | |
(let ((*current-pausable-thread* pt)) | |
(funcall (thread-function pt)))) | |
:name (name pt)))) | |
(defun make-pausable-thread (function name &key color) | |
(make-instance 'pausable-thread :thread-function function :name name :color color)) | |
(defun join-pausable-thread (pt) | |
(bt:join-thread (thread pt))) | |
(defun thread-formatting () | |
(color-formatter (if *current-pausable-thread* | |
(color *current-pausable-thread*) | |
"31;1"))) | |
(defmacro wait%% (type (condition test lock) &body body) | |
`(,type (,lock) | |
(loop until ,test | |
do (bt:condition-wait ,condition ,lock)) | |
,@body)) | |
(defmacro wait% ((condition test lock) &body body) | |
`(wait%% bt:with-lock-held (,condition ,test ,lock) ,@body)) | |
(defmacro wait-pausable ((pt) &body body) | |
`(wait%% bt:with-recursive-lock-held ((pausable-condition ,pt) (pausable ,pt) (thread-lock ,pt)) | |
,@body)) | |
(defmacro notify%% (type (condition test lock) &body body) | |
`(,type (,lock) | |
(setf ,test (progn ,@body)) | |
(bt:condition-notify ,condition))) | |
(defmacro notify% ((condition test lock) &body body) | |
`(notify%% bt:with-lock-held (,condition ,test ,lock) ,@body)) | |
(defun unpause (pt) | |
(notify% ((unpausing-condition pt) (unpausing pt) (signal-lock pt)) t)) | |
(defun set-pausable (val) | |
(notify%% bt:with-recursive-lock-held ((pausable-condition *current-pausable-thread*) (pausable *current-pausable-thread*) (thread-lock *current-pausable-thread*)) val)) | |
(defmacro pause%% ((condition test lock) (&body pre-pause) (&body post-pause)) | |
`(progn | |
,@pre-pause | |
(wait% (,condition ,test ,lock) | |
,@post-pause))) | |
(defun pause% () | |
(pause%% ((unpausing-condition *current-pausable-thread*) (unpausing *current-pausable-thread*) (signal-lock *current-pausable-thread*)) | |
((safe-format t "~?" (thread-formatting) `("~a" ("Pausing."))) | |
(setf (paused *current-pausable-thread*) t) | |
(notify% ((pausing-condition *current-pausable-thread*) (pausing *current-pausable-thread*) (signal-lock *current-pausable-thread*)) t)) | |
((setf (unpausing *current-pausable-thread*) nil) | |
(safe-format t "~?" (thread-formatting) `("~a" ("Unpausing."))) | |
(setf (paused *current-pausable-thread*) nil)))) | |
(defmacro with-paused-p ((var pt) &body body) | |
`(bt:with-recursive-lock-held ((thread-lock ,pt)) | |
(let ((,var (paused ,pt))) | |
,@body))) | |
(defun pause (pt) | |
(bt:interrupt-thread (thread pt) #'pause%) | |
(wait% ((pausing-condition pt) (pausing pt) (signal-lock pt)) | |
(setf (pausing pt) nil))) | |
(defun hello () | |
(safe-format t "~&Hello from ~?.~%" (thread-formatting) `("~a" (,(bt:thread-name (bt:current-thread)))))) | |
(defun status (count) | |
(safe-format t "~?" (thread-formatting) `("~a." (,count)))) | |
(defmacro unpausable (&body body) | |
`(unwind-protect | |
(progn | |
(set-pausable nil) | |
,@body) | |
(set-pausable t))) | |
(set-dispatch-macro-character #\# #\! #'(lambda (s c a) | |
(declare (ignore c a)) | |
`(unpausable ,(read s t nil t)))) | |
(defun worker (&optional (delay 1)) | |
(let ((i 0)) | |
(hello) | |
(loop | |
(sleep delay) | |
#!(with-synchronized-value (*progress-monitor*) | |
(setf (gethash (name *current-pausable-thread*) (sv-value *progress-monitor*)) i) | |
(status i)) | |
(incf i)))) | |
(defun main () | |
(let ((workers (list | |
(make-pausable-thread #'(lambda () (worker 0.01)) "worker A" :color "32;1") | |
(make-pausable-thread #'(lambda () (worker 0.02)) "worker B" :color "33;1") | |
(make-pausable-thread #'(lambda () (worker 0.015)) "worker C" :color "34;1")))) | |
(setf *progress-monitor* (make-synchronized-value (make-hash-table) "progress monitor")) | |
(loop for worker in workers | |
do (setf (gethash (name worker) (sv-value *progress-monitor*)) nil)) | |
(hello) | |
(loop | |
(sleep (/ (random 10) 10)) | |
(let ((who (nth (random (length workers)) workers))) | |
(with-paused-p (is-paused who) | |
(if is-paused | |
(progn | |
(safe-format t "~?" (thread-formatting) `("Unpause ~a." (,(name who)))) | |
(unpause who)) | |
(progn | |
(safe-format t "~?" (thread-formatting) `("Pause ~a." (,(name who)))) | |
(with-synchronized-value (*progress-monitor*) | |
(safe-format t "~?" (thread-formatting) | |
`("~{~{~a~^: ~}~^, ~}." (,(loop for k being the hash-key using (hash-value v) of (sv-value *progress-monitor*) collect `(,k ,v)))))) | |
(wait-pausable (who) | |
(pause who))))))) | |
(loop for worker in workers do (join-pausable-thread worker)))) | |
(handler-case | |
(main) | |
(sb-sys:interactive-interrupt () (fresh-line) (sb-ext:exit))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment