Created
March 25, 2023 20:36
-
-
Save faust45/34c8421118f05152ae8f9ba9afcc706e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(define-module (async) | |
#:export (make-channel | |
ch-write | |
ch-read | |
fiber)) | |
(use-modules (ice-9 match) | |
(ice-9 atomic) | |
(ice-9 q) | |
(ice-9 threads) | |
(ice-9 futures) | |
(ice-9 control)) | |
(define (make-channel) | |
(cons (make-mutex) | |
(cons (make-q) (make-q)))) | |
(define ch-messages cadr) | |
(define ch-listeners cddr) | |
(define ch-mutex car) | |
(define task-queue (make-q)) | |
(define task-queue-mutex | |
(make-mutex)) | |
(define worker-wakeup-flag | |
(make-condition-variable)) | |
(define worker #nil) | |
(define (start-worker) | |
(begin-thread | |
(while #t | |
(lock-mutex task-queue-mutex) | |
(when (q-empty? task-queue) | |
(wait-condition-variable worker-wakeup-flag task-queue-mutex)) | |
(match (deq! task-queue) | |
((promt . msg) | |
(unlock-mutex task-queue-mutex) | |
(% (promt msg))) | |
(_ (unlock-mutex task-queue-mutex)))))) | |
(define (try-process-msg ch) | |
(if (and (not (q-empty? (ch-messages ch))) | |
(not (q-empty? (ch-listeners ch)))) | |
(let ((promt (deq! (ch-listeners ch))) | |
(msg (deq! (ch-messages ch)))) | |
(with-mutex task-queue-mutex | |
(enq! task-queue (cons promt msg))) | |
(if worker | |
(signal-condition-variable worker-wakeup-flag) | |
(set! worker (start-worker)))))) | |
;; call only inside fiber | |
(define (ch-write ch msg) | |
(with-mutex (ch-mutex ch) | |
(enq! (ch-messages ch) msg) | |
(try-process-msg ch))) | |
;; call only inside fiber | |
(define (ch-read ch) | |
(with-mutex (ch-mutex ch) | |
(if (q-empty? (ch-messages ch)) | |
(abort | |
(lambda (promt) | |
(enq! (ch-listeners ch) promt) | |
(try-process-msg ch))) | |
(deq! (ch-messages ch))))) | |
(define-syntax-rule (fiber body ...) | |
(let ((f (lambda (msg) body ...))) | |
(with-mutex task-queue-mutex | |
(enq! task-queue (cons f 'start-fiber)) | |
(if worker | |
(signal-condition-variable worker-wakeup-flag) | |
(set! worker (start-worker)))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment