-
-
Save AkashaP/7081d9a6f1813b9e4c439f24b44684c9 to your computer and use it in GitHub Desktop.
POSIX message queues for Common Lisp.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; This is free and unencumbered software released into the public domain. | |
(asdf:defsystem :mqueue | |
:name "mqueue" | |
:description "POSIX message queues for Common Lisp." | |
:version "0.0.0" | |
:author "Arto Bendiken <[email protected]> modified by Akasha" | |
:depends-on (:cffi) | |
:serial t | |
:components ((:file "mqueue"))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; SOURCE:: https://gist.github.com/artob/5994998 | |
;; Tweaks to: | |
;; - work | |
;; - specify message sizing | |
;; - easy string sending wrapper | |
;; - should be thread safe | |
;; note: queues opened by this fork are all R/W. This is needed by #'flush. | |
(defpackage :mqueue | |
(:use :cl :cffi :cffi-sys #+sbcl :sb-alien) | |
(:export | |
:load-library | |
:unload-library | |
:unlink-queue | |
:open-queue | |
:close-queue | |
:send-message-raw | |
:receive-message-raw | |
:send-message | |
:receive-message | |
:f-message | |
:send-message-timed | |
:receive-message-timed | |
:flush | |
:send-message-raw-timed | |
:receive-message-raw-timed | |
:get-attributes | |
:set-attributes | |
:non-blocking-p | |
:set-non-blocking | |
:with-unblocked) | |
(:shadow :error)) | |
(in-package :mqueue) | |
(defparameter *buffer* (list)) | |
(eval-when (:compile-toplevel :load-toplevel :execute) | |
(define-foreign-library librt | |
(:unix (:or "librt.so.0" "librt.so")) | |
(:darwin (:or "librt.0.dylib" "librt.dylib")) | |
(t (:default "librt"))) | |
;; 'thread-local buffers' | |
;; we have to do this because if multiple threads send/receive on the same queue, | |
;; they cannot share this buffer or they will step in on each other. | |
;; posix mqueue is specifically parallel compatible. no sense in degrading that here. | |
(if (loop for c in bordeaux-threads:*default-special-bindings* | |
if (and (listp c) (eq '*buffer* (car c))) | |
do (return nil) | |
finally (return t)) | |
(pushnew (cons '*buffer* (list)) | |
bordeaux-threads:*default-special-bindings* | |
:test #'equalp))) | |
;; #include <fcntl.h> | |
;; (defparameter +O_RDONLY+ 00000000) ;; Linux: /usr/include/asm-generic/fcntl.h | |
;; (defparameter +O_WRONLY+ 00000001) ;; Linux: /usr/include/asm-generic/fcntl.h | |
;; (defparameter +O_RDWR+ 00000002) ;; Linux: /usr/include/asm-generic/fcntl.h | |
;; (defparameter +O_CREAT+ 00000100) ;; Linux: /usr/include/asm-generic/fcntl.h | |
(defmacro errno () | |
`(sb-alien:get-errno)) | |
(defcfun ("strerror" %%strerror) :string (errnum :int)) | |
(defctype descriptor :int) ;; NOTE: platform-specific type | |
(defcstruct timespec (seconds :long) (nanoseconds :long)) | |
(define-condition error (cl:error) ()) | |
(defcstruct mq_attr (mq_flags :long) (mq_maxmsg :long) (mq_msgsize :long) (mq_curmsgs :long)) | |
(define-condition foreign-function-error (error) | |
((function :initarg :function :reader foreign-function-error-function) | |
(code :initarg :code :reader foreign-function-error-code) | |
(message :initarg :message :reader foreign-function-error-message)) | |
(:report (lambda (condition stream) | |
(format stream "~A failed with error code ~A: ~A" | |
(foreign-function-error-function condition) | |
(foreign-function-error-code condition) | |
(foreign-function-error-message condition))))) | |
(define-condition unknown-pathname (foreign-function-error) ()) ;; ENOENT (2) | |
(define-condition bad-file-descriptor (foreign-function-error) ()) ;; EBADF (9) | |
(define-condition disallowed-access (foreign-function-error) ()) ;; EACCES (13) | |
(defun foreign-function-error (errno function-name &optional message) | |
(declare (type fixnum errno) | |
(type string function-name)) | |
(cl:error (find-foreign-function-error-class errno) | |
:function function-name | |
:code errno | |
:message (or message (%%strerror errno)))) | |
(defun find-foreign-function-error-class (errno) | |
(declare (type fixnum errno)) | |
(case errno | |
(2 'unknown-pathname) | |
(9 'bad-file-descriptor) | |
(13 'disallowed-access) | |
(t 'foreign-function-error))) | |
(defmacro with-checked-ssize-result (cfun-name &rest body) | |
(let ((ssize-var (gensym)) | |
(errno-var (gensym))) | |
`(let ((,ssize-var (progn ,@body))) | |
(declare (type integer ,ssize-var)) | |
(if (>= ,ssize-var 0) | |
,ssize-var | |
(foreign-function-error (errno) ,cfun-name))))) | |
(defmacro with-checked-int-result (cfun-name &rest body) | |
`(with-checked-ssize-result ,cfun-name ,@body)) | |
(defun load-library (&key path version debug features) | |
"Loads the POSIX message queue library. | |
Must be called before invoking any foreign functions in the library." | |
(declare (type boolean debug) | |
(type list features)) | |
(load-foreign-library 'librt) | |
(values)) ;;; no meaningful return value | |
(defun unload-library () | |
"Unloads the POSIX message queue library." | |
(close-foreign-library 'librt) | |
(values)) ;;; no meaningful return value | |
;; int mq_unlink(const char* name) | |
(defcfun ("mq_unlink" %%unlink) :int (name :string)) | |
(defun unlink-queue (queue-name) | |
"Removes a message queue from the system." | |
(declare (type string queue-name)) | |
(with-checked-int-result "mq_unlink" | |
(%%unlink queue-name)) | |
(values)) ;;; no meaningful return value | |
(defun get-or-create-get-buffer (queue-descriptor &optional (size 8192)) | |
(let ((c (assoc queue-descriptor *buffer*))) | |
(if c | |
(let ((res (cdr c))) | |
(if (< (length res) size) | |
(setf (cdr (assoc queue-descriptor *buffer*)) | |
(make-array size :element-type '(unsigned-byte 8) :initial-element 0))) | |
res) | |
(let ((res (make-array size :element-type '(unsigned-byte 8) :initial-element 0))) | |
(defparameter *buffer* (list (cons queue-descriptor res))) | |
res)))) | |
;; mqd_t mq_open(const char* name, int oflag, mode_t mode, struct mq_attr* attr) | |
(defcfun ("mq_open" %%open) descriptor (name :string) (oflag :int) (mode :int) (attr :pointer)) | |
(defun open-queue (queue-name &optional nonblocking | |
;; NOTE 0 does not mean 'use system default'. | |
(max-msg-size-bytes 8192 p1) | |
(max-msgs 10 p2)) | |
"Opens or creates a message queue." | |
(declare (type string queue-name)) | |
(with-foreign-object (x '(:struct mq_attr)) | |
(with-foreign-slots ((mq_maxmsg mq_msgsize) x | |
(:struct mq_attr)) | |
(setf mq_maxmsg max-msgs | |
mq_msgsize max-msg-size-bytes) | |
(let ((res (%%open queue-name | |
;; (ecase (or direction :input) | |
;; (:input (logior flags +O_RDONLY+)) | |
;; (:output (logior flags +O_WRONLY+)) | |
;; (:io (logior flags +O_RDWR+))) | |
;; mode | |
(logior #o2 #o100 (if nonblocking #o4000 0)) | |
(logior #o200 #o400 #o020 #o040) | |
(if (or p1 p2) | |
(mem-aptr x :struct) | |
(cffi:null-pointer))))) | |
(get-or-create-get-buffer res) | |
res)))) | |
;; int mq_close(mqd_t mqdes) | |
(defcfun ("mq_close" %%close) :int (mqdes descriptor)) | |
(defun close-queue (queue-descriptor) | |
"Closes a message queue descriptor." | |
(declare (type fixnum queue-descriptor)) | |
(with-checked-int-result "mq_close" | |
(%%close queue-descriptor)) | |
(values)) ;;; no meaningful return value | |
;; int mq_send(mqd_t mqdes, const char* msg_ptr, size_t msg_len, unsigned msg_prio) | |
(defcfun ("mq_send" %%send) :int (mqdes descriptor) (msg-ptr :pointer) (msg-len :ulong) (msg-prio :uint)) | |
(defcfun ("mq_timedsend" %%timedsend) :int (mqdes descriptor) (msg-ptr (:pointer)) (msg-len :ulong) | |
(msg-prio :uint) (abs-timeout :pointer)) | |
(defun send-message-raw (queue-descriptor message-pointer message-size &key (message-priority 0)) | |
"Sends a message to a message queue." | |
(declare (type fixnum queue-descriptor message-size) | |
(type foreign-pointer message-pointer) | |
(type (or fixnum null) message-priority)) | |
(with-pointer-to-vector-data (ptr (get-or-create-get-buffer queue-descriptor message-size)) | |
(with-checked-int-result "mq_send" | |
(%%send queue-descriptor message-pointer message-size | |
message-priority))) | |
(values)) | |
(defun send-message (queue-descriptor message &key (message-priority 0)) | |
(declare (dynamic-extent message)) | |
(with-foreign-string (x message) | |
(send-message-raw queue-descriptor x (length message) :message-priority message-priority))) | |
(defun send-message-raw-timed (queue-descriptor message-pointer message-size | |
&key (message-priority 0) timeout) | |
"Sends a message to a message queue." | |
(declare (type fixnum queue-descriptor message-size) | |
(type foreign-pointer message-pointer) | |
(type (or fixnum null) message-priority) | |
(type fixnum timeout)) | |
(multiple-value-bind (secs mils) (floor timeout) | |
(with-foreign-object (tim '(:struct timespec)) | |
(with-foreign-slots ((seconds nanoseconds) tim (:struct timespec)) | |
(setf seconds (+ (sb-ext:get-time-of-day) secs) nanoseconds (* mils 1000000))) | |
(with-checked-int-result "mq_timedsend" | |
(%%timedsend queue-descriptor message-pointer message-size | |
message-priority tim)))) | |
;; (values) | |
) ;;; no meaningful return value | |
(defun send-message-timed (queue-descriptor message | |
&key (message-priority 0) timeout) | |
(declare (dynamic-extent message) | |
(fixnum message-priority) | |
(string message)) | |
(with-foreign-string (x message) | |
(send-message-raw-timed queue-descriptor x (length message) :message-priority message-priority :timeout timeout))) | |
;; ssize_t mq_receive(mqd_t mqdes, char* msg_ptr, size_t msg_len, unsigned* msg_prio) | |
(defcfun ("mq_receive" %%receive) :long (mqdes descriptor) (msg-ptr :pointer) (msg-len :ulong) (msg-prio :pointer)) | |
(defcfun ("mq_timedreceive" %%timedreceive) :long (mqdes descriptor) (msg-ptr (:pointer)) | |
(msg-len :ulong) (msg-prio :pointer) (abs-timeout :pointer)) | |
(defun receive-message-raw (queue-descriptor message-pointer message-size) | |
"Receives a message from a message queue. | |
Returns the number of bytes in the received message." | |
(declare (type fixnum queue-descriptor message-size) | |
(type foreign-pointer message-pointer)) | |
(with-foreign-object (priority :uint) | |
(with-checked-ssize-result "mq_receive" | |
(values (%%receive queue-descriptor message-pointer message-size priority) | |
(mem-aref priority :uint))))) | |
(defun receive-message (queue-descriptor) | |
(declare (type fixnum queue-descriptor)) | |
(let ((buf (get-or-create-get-buffer queue-descriptor))) | |
(with-pointer-to-vector-data (ptr buf) | |
(multiple-value-bind (length prio) | |
(receive-message-raw queue-descriptor ptr (length buf)) | |
(values (babel:octets-to-string buf :end length) prio))))) | |
(defun receive-message-raw-timed (queue-descriptor message-pointer message-size | |
&key timeout) | |
"Receives a message from a message queue. | |
Returns the number of bytes in the received message." | |
(declare (type fixnum queue-descriptor message-size) | |
(type foreign-pointer message-pointer)) | |
(multiple-value-bind (secs mils) (floor timeout) | |
(with-foreign-object (tim '(:struct timespec)) | |
(with-foreign-slots ((seconds nanoseconds) tim (:struct timespec)) | |
(setf seconds (+ (serapeum:get-unix-time) secs) nanoseconds (* mils 1000000))) | |
(with-foreign-object (prio :uint) | |
(with-checked-ssize-result "mq_timedreceive" | |
(values (%%timedreceive queue-descriptor message-pointer message-size prio tim) | |
(mem-aref prio :uint))))))) | |
(defun receive-message-timed (queue-descriptor message-size | |
&key timeout) | |
"Receives a message from a message queue. | |
Returns the number of bytes in the received message." | |
(declare (type fixnum queue-descriptor message-size)) | |
(let ((buf (get-or-create-get-buffer queue-descriptor message-size))) | |
(with-pointer-to-vector-data (ptr buf) | |
(multiple-value-bind (status prio) | |
(receive-message-raw-timed queue-descriptor ptr message-size timeout) | |
(values (octets-to-string buf) prio))))) | |
(defcfun ("mq_getattr" %%getattr) :long | |
"" | |
(mqdes descriptor) | |
(attr (:pointer (:struct mq_attr)))) | |
(defun get-attributes (queue-descriptor) | |
(with-foreign-object (x '(:struct mq_attr)) | |
(%%getattr queue-descriptor x) | |
(with-foreign-slots ((mq_flags mq_maxmsg mq_msgsize mq_curmsgs) x | |
(:struct mq_attr)) | |
(values mq_flags mq_maxmsg mq_msgsize mq_curmsgs)))) | |
(defun non-blocking-p (queue-descriptor) | |
(eq #o4000 (mqueue::get-attributes 35))) | |
(defcfun ("mq_setattr" %%setattr) :long | |
"" | |
(mqdes descriptor) | |
(attr (:pointer (:struct mq_attr))) | |
(oattr (:pointer (:struct mq_attr)))) | |
;; Note: as per the manual, the only thing that can be changed | |
;; is the non-blocking flag | |
(defun set-attributes (queue-descriptor flags) | |
(with-foreign-object (x '(:struct mq_attr)) | |
(with-foreign-object (y '(:struct mq_attr)) | |
(with-foreign-slots ((mq_flags) x (:struct mq_attr)) | |
(setf mq_flags flags) | |
(%%setattr queue-descriptor x y))))) | |
(defun set-non-blocking (queue-descriptor non-blocking) | |
(set-attributes queue-descriptor (if non-blocking #o4000 0))) | |
(defmacro with-unblocked (queue-descriptor &body body) | |
(let ((wasnt-blocked (gensym)) | |
(res (gensym))) | |
`(let ((,wasnt-blocked (non-blocking-p queue-descriptor)) | |
(,res nil)) | |
(unless ,wasnt-blocked | |
(set-non-blocking queue-descriptor nil)) | |
(let ((,res (progn ,@body))) | |
(unless ,wasnt-blocked | |
(set-non-blocking queue-descriptor t)) | |
,res)))) | |
(defun flush (queue-descriptor) | |
"Small hack function to clear old messages. | |
Messages are persistent unless ALL programs are disconnected from the queue, | |
no matter what you did." | |
(with-unblocked queue-descriptor | |
(handler-case (loop do (receive-message queue-descriptor)) | |
(foreign-function-error () t)))) | |
(defun f-message (queue-descriptor message) | |
"Like send-message, but if the queue is full, forces this message onto it" | |
(multiple-value-bind (mq_flags mq_maxmsg mq_msgsize mq_curmsgs) | |
(get-attributes queue-descriptor) | |
(if (>= mq_curmsgs mq_maxmsg) | |
(with-unblocked queue-descriptor | |
(receive-message queue-descriptor))) | |
(send-message queue-descriptor message))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment