Skip to content

Instantly share code, notes, and snippets.

@DarinM223
Last active March 18, 2021 23:22
Show Gist options
  • Save DarinM223/df85d839dc0006e50c75bf2c3bb392a8 to your computer and use it in GitHub Desktop.
Save DarinM223/df85d839dc0006e50c75bf2c3bb392a8 to your computer and use it in GitHub Desktop.
Asynchronous channel in Common Lisp using cl-async
(eval-when (:compile-toplevel :load-toplevel :execute)
(ql:quickload :cl-async)
(ql:quickload :iterate)
(ql:quickload :bordeaux-threads)
(ql:quickload :cl-speedy-queue)
(ql:quickload :access)
(use-package :iterate))
(access:enable-dot-syntax)
(defclass async-channel ()
((queue :initarg :queue)
(queue-lock :initform (bt:make-lock))
(send-ops :initarg :send-ops :initform nil)
(recv-ops :initarg :recv-ops :initform nil)
(send-queue :initform (cl-speedy-queue:make-queue 100))
(send-queue-lock :initform (bt:make-recursive-lock))
(recv-queue :initform (cl-speedy-queue:make-queue 100))
(recv-queue-lock :initform (bt:make-recursive-lock))))
(defun make-async-channel (size)
(let ((chan (make-instance 'async-channel
:queue (cl-speedy-queue:make-queue size))))
(flet ((call-queue-fn (queue lock)
(bt:with-recursive-lock-held (lock)
(when (not (cl-speedy-queue:queue-empty-p queue))
(funcall (cl-speedy-queue:dequeue queue))))))
(let ((send-notifier (as:make-notifier
(lambda ()
(call-queue-fn
#Dchan.send-queue
#Dchan.send-queue-lock))
:single-shot nil))
(recv-notifier (as:make-notifier
(lambda ()
(call-queue-fn
#Dchan.recv-queue
#Dchan.recv-queue-lock))
:single-shot nil)))
(setf #Dchan.send-ops send-notifier
#Dchan.recv-ops recv-notifier)
chan))))
(defun send (chan msg cb)
(let ((result (bt:with-lock-held (#Dchan.queue-lock)
(and (not (cl-speedy-queue:queue-full-p #Dchan.queue))
(progn
(cl-speedy-queue:enqueue msg #Dchan.queue)
t)))))
(if result
(progn
(as:trigger-notifier #Dchan.recv-ops)
(when (> (cl-speedy-queue:queue-length #Dchan.queue) 1)
(as:trigger-notifier #Dchan.send-ops))
(funcall cb))
(let ((f (lambda () (send chan msg cb))))
(bt:with-recursive-lock-held (#Dchan.send-queue-lock)
(cl-speedy-queue:enqueue f #Dchan.send-queue))))))
(defun recv (chan cb)
(multiple-value-bind (ok msg)
(bt:with-lock-held (#Dchan.queue-lock)
(and (not (cl-speedy-queue:queue-empty-p #Dchan.queue))
(values t (cl-speedy-queue:dequeue #Dchan.queue))))
(if ok
(progn
(as:trigger-notifier #Dchan.send-ops)
(when (> (cl-speedy-queue:queue-length #Dchan.queue) 1)
(as:trigger-notifier #Dchan.recv-ops))
(funcall cb msg))
(let ((f (lambda () (recv chan cb))))
(bt:with-recursive-lock-held (#Dchan.recv-queue-lock)
(cl-speedy-queue:enqueue f #Dchan.recv-queue))))))
; Basic example: makes a channel of size 10, sends 10 messages,
; and receives 10 messages back.
(as:with-event-loop ()
(let ((chan (make-async-channel 10)))
(iter (for i from 0 below 10)
(send chan i (lambda () (format t "Sent ~a~%" i))))
(iter (repeat 10)
(recv chan (lambda (msg) (format t "Received ~a~%" msg))))))
; Basic example but with threads. `i` is enclosed inside a lambda
; to prevent using a future value of `i`.
(as:with-event-loop ()
(let ((chan (make-async-channel 10)))
(iter (for i from 0 below 10)
(funcall
(lambda (i)
(bt:make-thread
(lambda ()
(send chan i (lambda () (format t "Sent ~a~%" i))))))
i))
(iter (repeat 10)
(bt:make-thread
(lambda ()
(recv chan (lambda (msg) (format t "Received ~a~%" msg))))))))
; Output:
; Sent 0
; Sent 1
; Sent 2
; Sent 3
; Sent 4
; Sent 5
; Sent 6
; Sent 7
; Sent 8
; Sent 9
; Received 0
; Received 1
; Received 2
; Received 3
; Received 4
; Received 5
; Received 6
; Received 7
; Received 8
; Received 9
; Tests that `nil` values can be sent over the channel.
(as:with-event-loop ()
(let ((chan (make-async-channel 10)))
(iter (for i from 0 below 10)
(send chan nil (lambda () (format t "Sent ~a~%" i))))
(iter (repeat 10)
(recv chan (lambda (msg) (format t "Received ~a~%" msg))))))
; Output:
; Sent 0
; Sent 1
; Sent 2
; Sent 3
; Sent 4
; Sent 5
; Sent 6
; Sent 7
; Sent 8
; Sent 9
; Received NIL
; Received NIL
; Received NIL
; Received NIL
; Received NIL
; Received NIL
; Received NIL
; Received NIL
; Received NIL
; Received NIL
; Makes a channel of size 5 and attempts to send 10 messages with threads.
; The 10 receiving threads are created after waiting 2 seconds. What happens
; is that 5 messages get sent and then the sending is asynchronously "blocked"
; waiting for 2 seconds. Then the 5 messages are consumed at once and then
; the last 5 messages are sent and received back and forth.
(as:with-event-loop ()
(let ((chan (make-async-channel 5)))
(iter (for i from 0 below 10)
(funcall
(lambda (i)
(bt:make-thread
(lambda ()
(send chan i (lambda () (format t "Sent ~a~%" i))))))
i))
(as:delay
(lambda ()
(iter (repeat 10)
(bt:make-thread
(lambda ()
(recv chan (lambda (msg) (format t "Received ~a~%" msg)))))))
:time 2)))
; Output:
; Sent 0
; Sent 1
; Sent 2
; Sent 3
; Sent 4
; Received 0
; Received 1
; Received 2
; Received 3
; Received 4
; Sent 6
; Received 6
; Sent 7
; Received 7
; Sent 8
; Received 8
; Sent 9
; Received 9
; Sent 5
; Received 5
; This example introduces a delay of one second between receiving each message.
; First 5 messages are sent as usual. Then whenever one message is received
; another message is sent. Finally, the last 5 messages are received every second.
(as:with-event-loop ()
(let ((chan (make-async-channel 5)))
(iter (for i from 0 below 10)
(funcall
(lambda (i)
(bt:make-thread
(lambda ()
(send chan i (lambda () (format t "Sent ~a~%" i))))))
i))
(iter (for i from 0 below 10)
(as:delay
(lambda ()
(recv chan (lambda (msg) (format t "Received ~a~%" msg))))
:time i))))
; Output:
; Sent 0
; Sent 1
; Sent 2
; Sent 3
; Sent 4
; Received 0
; Sent 5
; Received 1
; Sent 7
; Received 2
; Sent 9
; Received 3
; Sent 8
; Received 4
; Sent 6
; Received 5
; Received 7
; Received 9
; Received 8
; Received 6
; Spawns two receiver tasks that loop indefinitely receiving messages from
; the channel. Because recursive calls use `as:delay`, task 1 and task 2 will
; alternate back and forth receiving the messages from the channel.
(as:with-event-loop ()
(let ((chan (make-async-channel 5)))
(iter (for i from 0 below 10)
(funcall
(lambda (i)
(send chan i (lambda () (format t "Sent ~a~%" i))))
i))
(labels ((rec1 ()
(recv chan (lambda (v)
(format t "Received ~a on task 1~%" v)
(as:delay #'rec1 :time 0))))
(rec2 ()
(recv chan (lambda (v)
(format t "Received ~a on task 2~%" v)
(as:delay #'rec2 :time 0)))))
(rec1)
(rec2))))
; Output:
; Sent 0
; Sent 1
; Sent 2
; Sent 3
; Sent 4
; Received 0 on task 1
; Received 1 on task 2
; Received 2 on task 1
; Received 3 on task 2
; Received 4 on task 1
; Sent 5
; Received 5 on task 2
; Sent 6
; Received 6 on task 1
; Sent 7
; Received 7 on task 2
; Sent 8
; Received 8 on task 1
; Sent 9
; Received 9 on task 2
; Same as the previous example, but now both receivers are in
; threads and will fight over who will get the next value in the channel.
; Which thread gets which value is not fixed. `sleep` is used to prevent
; one thread from hogging all of the values in the channel.
(as:with-event-loop ()
(let ((chan (make-async-channel 5)))
(iter (for i from 0 below 10)
(funcall
(lambda (i)
(send chan i (lambda () (format t "Sent ~a~%" i))))
i))
(bt:make-thread
(lambda ()
(labels ((rec ()
(recv chan (lambda (v)
(format t "Received ~a on thread 1~%" v)
(sleep 0)
(rec)))))
(rec))))
(bt:make-thread
(lambda ()
(labels ((rec ()
(recv chan (lambda (v)
(format t "Received ~a on thread 2~%" v)
(sleep 0)
(rec)))))
(rec))))))
; One possible output:
; Sent 0
; Sent 1
; Sent 2
; Sent 3
; Sent 4
; Received 0 on thread 1
; Received 1 on thread 1
; Received 2 on thread 1
; Received 3 on thread 2
; Received 4 on thread 1
; Sent 5
; Received 5 on thread 2
; Sent 6
; Received 6 on thread 2
; Sent 7
; Received 7 on thread 1
; Sent 8
; Received 8 on thread 2
; Sent 9
; Received 9 on thread 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment