Last active
March 18, 2021 23:22
-
-
Save DarinM223/df85d839dc0006e50c75bf2c3bb392a8 to your computer and use it in GitHub Desktop.
Asynchronous channel in Common Lisp using cl-async
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(eval-when (:compile-toplevel :load-toplevel :execute) | |
(ql:quickload :cl-async) | |
(ql:quickload :iterate) | |
(ql:quickload :bordeaux-threads) | |
(ql:quickload :cl-speedy-queue) | |
(ql:quickload :access) | |
(use-package :iterate)) | |
(access:enable-dot-syntax) | |
(defclass async-channel () | |
((queue :initarg :queue) | |
(queue-lock :initform (bt:make-lock)) | |
(send-ops :initarg :send-ops :initform nil) | |
(recv-ops :initarg :recv-ops :initform nil) | |
(send-queue :initform (cl-speedy-queue:make-queue 100)) | |
(send-queue-lock :initform (bt:make-recursive-lock)) | |
(recv-queue :initform (cl-speedy-queue:make-queue 100)) | |
(recv-queue-lock :initform (bt:make-recursive-lock)))) | |
(defun make-async-channel (size) | |
(let ((chan (make-instance 'async-channel | |
:queue (cl-speedy-queue:make-queue size)))) | |
(flet ((call-queue-fn (queue lock) | |
(bt:with-recursive-lock-held (lock) | |
(when (not (cl-speedy-queue:queue-empty-p queue)) | |
(funcall (cl-speedy-queue:dequeue queue)))))) | |
(let ((send-notifier (as:make-notifier | |
(lambda () | |
(call-queue-fn | |
#Dchan.send-queue | |
#Dchan.send-queue-lock)) | |
:single-shot nil)) | |
(recv-notifier (as:make-notifier | |
(lambda () | |
(call-queue-fn | |
#Dchan.recv-queue | |
#Dchan.recv-queue-lock)) | |
:single-shot nil))) | |
(setf #Dchan.send-ops send-notifier | |
#Dchan.recv-ops recv-notifier) | |
chan)))) | |
(defun send (chan msg cb) | |
(let ((result (bt:with-lock-held (#Dchan.queue-lock) | |
(and (not (cl-speedy-queue:queue-full-p #Dchan.queue)) | |
(progn | |
(cl-speedy-queue:enqueue msg #Dchan.queue) | |
t))))) | |
(if result | |
(progn | |
(as:trigger-notifier #Dchan.recv-ops) | |
(when (> (cl-speedy-queue:queue-length #Dchan.queue) 1) | |
(as:trigger-notifier #Dchan.send-ops)) | |
(funcall cb)) | |
(let ((f (lambda () (send chan msg cb)))) | |
(bt:with-recursive-lock-held (#Dchan.send-queue-lock) | |
(cl-speedy-queue:enqueue f #Dchan.send-queue)))))) | |
(defun recv (chan cb) | |
(multiple-value-bind (ok msg) | |
(bt:with-lock-held (#Dchan.queue-lock) | |
(and (not (cl-speedy-queue:queue-empty-p #Dchan.queue)) | |
(values t (cl-speedy-queue:dequeue #Dchan.queue)))) | |
(if ok | |
(progn | |
(as:trigger-notifier #Dchan.send-ops) | |
(when (> (cl-speedy-queue:queue-length #Dchan.queue) 1) | |
(as:trigger-notifier #Dchan.recv-ops)) | |
(funcall cb msg)) | |
(let ((f (lambda () (recv chan cb)))) | |
(bt:with-recursive-lock-held (#Dchan.recv-queue-lock) | |
(cl-speedy-queue:enqueue f #Dchan.recv-queue)))))) | |
; Basic example: makes a channel of size 10, sends 10 messages, | |
; and receives 10 messages back. | |
(as:with-event-loop () | |
(let ((chan (make-async-channel 10))) | |
(iter (for i from 0 below 10) | |
(send chan i (lambda () (format t "Sent ~a~%" i)))) | |
(iter (repeat 10) | |
(recv chan (lambda (msg) (format t "Received ~a~%" msg)))))) | |
; Basic example but with threads. `i` is enclosed inside a lambda | |
; to prevent using a future value of `i`. | |
(as:with-event-loop () | |
(let ((chan (make-async-channel 10))) | |
(iter (for i from 0 below 10) | |
(funcall | |
(lambda (i) | |
(bt:make-thread | |
(lambda () | |
(send chan i (lambda () (format t "Sent ~a~%" i)))))) | |
i)) | |
(iter (repeat 10) | |
(bt:make-thread | |
(lambda () | |
(recv chan (lambda (msg) (format t "Received ~a~%" msg)))))))) | |
; Output: | |
; Sent 0 | |
; Sent 1 | |
; Sent 2 | |
; Sent 3 | |
; Sent 4 | |
; Sent 5 | |
; Sent 6 | |
; Sent 7 | |
; Sent 8 | |
; Sent 9 | |
; Received 0 | |
; Received 1 | |
; Received 2 | |
; Received 3 | |
; Received 4 | |
; Received 5 | |
; Received 6 | |
; Received 7 | |
; Received 8 | |
; Received 9 | |
; Tests that `nil` values can be sent over the channel. | |
(as:with-event-loop () | |
(let ((chan (make-async-channel 10))) | |
(iter (for i from 0 below 10) | |
(send chan nil (lambda () (format t "Sent ~a~%" i)))) | |
(iter (repeat 10) | |
(recv chan (lambda (msg) (format t "Received ~a~%" msg)))))) | |
; Output: | |
; Sent 0 | |
; Sent 1 | |
; Sent 2 | |
; Sent 3 | |
; Sent 4 | |
; Sent 5 | |
; Sent 6 | |
; Sent 7 | |
; Sent 8 | |
; Sent 9 | |
; Received NIL | |
; Received NIL | |
; Received NIL | |
; Received NIL | |
; Received NIL | |
; Received NIL | |
; Received NIL | |
; Received NIL | |
; Received NIL | |
; Received NIL | |
; Makes a channel of size 5 and attempts to send 10 messages with threads. | |
; The 10 receiving threads are created after waiting 2 seconds. What happens | |
; is that 5 messages get sent and then the sending is asynchronously "blocked" | |
; waiting for 2 seconds. Then the 5 messages are consumed at once and then | |
; the last 5 messages are sent and received back and forth. | |
(as:with-event-loop () | |
(let ((chan (make-async-channel 5))) | |
(iter (for i from 0 below 10) | |
(funcall | |
(lambda (i) | |
(bt:make-thread | |
(lambda () | |
(send chan i (lambda () (format t "Sent ~a~%" i)))))) | |
i)) | |
(as:delay | |
(lambda () | |
(iter (repeat 10) | |
(bt:make-thread | |
(lambda () | |
(recv chan (lambda (msg) (format t "Received ~a~%" msg))))))) | |
:time 2))) | |
; Output: | |
; Sent 0 | |
; Sent 1 | |
; Sent 2 | |
; Sent 3 | |
; Sent 4 | |
; Received 0 | |
; Received 1 | |
; Received 2 | |
; Received 3 | |
; Received 4 | |
; Sent 6 | |
; Received 6 | |
; Sent 7 | |
; Received 7 | |
; Sent 8 | |
; Received 8 | |
; Sent 9 | |
; Received 9 | |
; Sent 5 | |
; Received 5 | |
; This example introduces a delay of one second between receiving each message. | |
; First 5 messages are sent as usual. Then whenever one message is received | |
; another message is sent. Finally, the last 5 messages are received every second. | |
(as:with-event-loop () | |
(let ((chan (make-async-channel 5))) | |
(iter (for i from 0 below 10) | |
(funcall | |
(lambda (i) | |
(bt:make-thread | |
(lambda () | |
(send chan i (lambda () (format t "Sent ~a~%" i)))))) | |
i)) | |
(iter (for i from 0 below 10) | |
(as:delay | |
(lambda () | |
(recv chan (lambda (msg) (format t "Received ~a~%" msg)))) | |
:time i)))) | |
; Output: | |
; Sent 0 | |
; Sent 1 | |
; Sent 2 | |
; Sent 3 | |
; Sent 4 | |
; Received 0 | |
; Sent 5 | |
; Received 1 | |
; Sent 7 | |
; Received 2 | |
; Sent 9 | |
; Received 3 | |
; Sent 8 | |
; Received 4 | |
; Sent 6 | |
; Received 5 | |
; Received 7 | |
; Received 9 | |
; Received 8 | |
; Received 6 | |
; Spawns two receiver tasks that loop indefinitely receiving messages from | |
; the channel. Because recursive calls use `as:delay`, task 1 and task 2 will | |
; alternate back and forth receiving the messages from the channel. | |
(as:with-event-loop () | |
(let ((chan (make-async-channel 5))) | |
(iter (for i from 0 below 10) | |
(funcall | |
(lambda (i) | |
(send chan i (lambda () (format t "Sent ~a~%" i)))) | |
i)) | |
(labels ((rec1 () | |
(recv chan (lambda (v) | |
(format t "Received ~a on task 1~%" v) | |
(as:delay #'rec1 :time 0)))) | |
(rec2 () | |
(recv chan (lambda (v) | |
(format t "Received ~a on task 2~%" v) | |
(as:delay #'rec2 :time 0))))) | |
(rec1) | |
(rec2)))) | |
; Output: | |
; Sent 0 | |
; Sent 1 | |
; Sent 2 | |
; Sent 3 | |
; Sent 4 | |
; Received 0 on task 1 | |
; Received 1 on task 2 | |
; Received 2 on task 1 | |
; Received 3 on task 2 | |
; Received 4 on task 1 | |
; Sent 5 | |
; Received 5 on task 2 | |
; Sent 6 | |
; Received 6 on task 1 | |
; Sent 7 | |
; Received 7 on task 2 | |
; Sent 8 | |
; Received 8 on task 1 | |
; Sent 9 | |
; Received 9 on task 2 | |
; Same as the previous example, but now both receivers are in | |
; threads and will fight over who will get the next value in the channel. | |
; Which thread gets which value is not fixed. `sleep` is used to prevent | |
; one thread from hogging all of the values in the channel. | |
(as:with-event-loop () | |
(let ((chan (make-async-channel 5))) | |
(iter (for i from 0 below 10) | |
(funcall | |
(lambda (i) | |
(send chan i (lambda () (format t "Sent ~a~%" i)))) | |
i)) | |
(bt:make-thread | |
(lambda () | |
(labels ((rec () | |
(recv chan (lambda (v) | |
(format t "Received ~a on thread 1~%" v) | |
(sleep 0) | |
(rec))))) | |
(rec)))) | |
(bt:make-thread | |
(lambda () | |
(labels ((rec () | |
(recv chan (lambda (v) | |
(format t "Received ~a on thread 2~%" v) | |
(sleep 0) | |
(rec))))) | |
(rec)))))) | |
; One possible output: | |
; Sent 0 | |
; Sent 1 | |
; Sent 2 | |
; Sent 3 | |
; Sent 4 | |
; Received 0 on thread 1 | |
; Received 1 on thread 1 | |
; Received 2 on thread 1 | |
; Received 3 on thread 2 | |
; Received 4 on thread 1 | |
; Sent 5 | |
; Received 5 on thread 2 | |
; Sent 6 | |
; Received 6 on thread 2 | |
; Sent 7 | |
; Received 7 on thread 1 | |
; Sent 8 | |
; Received 8 on thread 2 | |
; Sent 9 | |
; Received 9 on thread 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment