Created
October 9, 2010 08:55
-
-
Save kiwanami/618036 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;;; concurrent.el --- Concurrent utility functions (PLAN!) | |
;; Copyright (C) 2010 SAKURAI Masashi | |
;; Author: SAKURAI Masashi <sakurai@liza2> | |
;; Keywords: deferred, async, thread | |
;; This program is free software; you can redistribute it and/or modify | |
;; it under the terms of the GNU General Public License as published by | |
;; the Free Software Foundation, either version 3 of the License, or | |
;; (at your option) any later version. | |
;; This program is distributed in the hope that it will be useful, | |
;; but WITHOUT ANY WARRANTY; without even the implied warranty of | |
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
;; GNU General Public License for more details. | |
;; You should have received a copy of the GNU General Public License | |
;; along with this program. If not, see <http://www.gnu.org/licenses/>. | |
;;; Commentary: | |
;; | |
(eval-when-compile | |
(require 'cl)) | |
(require 'deferred) | |
;;; Code: | |
(defmacro cc:aif (test-form then-form &rest else-forms) | |
(declare (debug ("test-form" form "then-form" form &rest form))) | |
`(let ((it ,test-form)) | |
(if it ,then-form ,@else-forms))) | |
(put 'cc:aif 'lisp-indent-function 2) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; Generator | |
(defun cc:generator-replace-yield (tree) | |
(let (ret) | |
(loop for i in tree | |
do (cond | |
((eq i 'yield) | |
(push 'funcall ret) | |
(push i ret)) | |
((listp i) | |
(push (cc:generator-replace-yield i) ret)) | |
(t | |
(push i ret)))) | |
(nreverse ret))) | |
(defun cc:generator-line (line) | |
(cond | |
;; function object | |
((functionp line) | |
`(setq ,chain (deferred:nextc ,chain ,line))) | |
;; while loop form | |
((eq 'while (car line)) | |
(let ((condition (cadr line)) | |
(body (cddr line))) | |
`(setq ,chain | |
(deferred:nextc ,chain | |
(deferred:lambda (x) | |
(if ,condition | |
(deferred:nextc | |
(progn | |
,@(cc:generator-replace-yield body)) self))))))) | |
;; statement | |
(t | |
`(setq ,chain | |
(deferred:nextc ,chain | |
(deferred:lambda (x) ,(cc:generator-replace-yield line))))))) | |
(defmacro cc:generator (callback &rest body) | |
(let ((chain (gensym)) | |
(cc (gensym)) | |
(waiter (gensym))) | |
`(lexical-let* | |
(,chain | |
(,cc ,callback) | |
(,waiter (deferred:new)) | |
(yield (lambda (x) (funcall ,cc x) ,waiter))) | |
(setq ,chain ,waiter) | |
,@(loop for i in body | |
collect | |
(cc:generator-line i)) | |
(lambda () (deferred:callback ,waiter))))) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; Thread | |
(defun cc:thread-line (wait-time chain line) | |
(cond | |
;; function object | |
((functionp line) | |
`(setq ,chain (deferred:nextc ,chain ,line))) | |
;; while loop form | |
((eq 'while (car line)) | |
(let ((condition (cadr line)) | |
(body (cddr line)) | |
(retsym (gensym))) | |
`(setq ,chain | |
(deferred:nextc ,chain | |
(deferred:lambda (x) | |
(if ,condition | |
(deferred:nextc | |
(let ((,retsym (progn ,@body))) | |
(if (deferred-p ,retsym) ,retsym | |
(deferred:wait ,wait-time))) | |
self))))))) | |
;; statement | |
(t | |
`(setq ,chain | |
(deferred:nextc ,chain | |
(lambda (x) ,line)))))) | |
(defmacro cc:thread (wait-time &rest body) | |
(let ((chain (gensym)) | |
(dstart (gensym))) | |
`(lexical-let* | |
(,chain | |
(,dstart (deferred:new))) | |
(setq ,chain ,dstart) | |
,@(loop for i in body | |
collect | |
(cc:thread-line wait-time chain i)) | |
(deferred:callback ,dstart)))) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; Semaphore | |
(defstruct cc:semaphore max-permits permits waiting-deferreds) | |
(defun cc:semaphore-create(permits-num) | |
(make-cc:semaphore :max-permits permits-num :permits permits-num)) | |
(defun cc:semaphore-aquire(semaphore) | |
(cond | |
((< 0 (cc:semaphore-permits semaphore)) | |
(decf (cc:semaphore-permits semaphore)) | |
(deferred:succeed)) | |
(t | |
(let ((d (deferred:new))) | |
(push d (cc:semaphore-waiting-deferreds semaphore)) | |
d)))) | |
(defun cc:semaphore-release(semaphore) | |
(when (<= (cc:semaphore-max-permits semaphore) | |
(cc:semaphore-permits semaphore)) | |
(error "Too many calling semaphore-release. [max:%s <= permits:%s]" | |
(cc:semaphore-max-permits semaphore) | |
(cc:semaphore-permits semaphore))) | |
(let ((waiting-deferreds | |
(cc:semaphore-waiting-deferreds semaphore))) | |
(cond | |
(waiting-deferreds | |
(let* ((d (car (last waiting-deferreds)))) | |
(setf (cc:semaphore-waiting-deferreds semaphore) | |
(nbutlast waiting-deferreds)) | |
(deferred:callback-post d))) | |
(t | |
(incf (cc:semaphore-permits semaphore))))) | |
semaphore) | |
(defun cc:semaphore-release-all(semaphore) | |
(setf (cc:semaphore-permits semaphore) | |
(cc:semaphore-max-permits semaphore)) | |
(let ((ds (cc:semaphore-waiting-deferreds semaphore))) | |
(when ds | |
(setf (cc:semaphore-waiting-deferreds semaphore) nil)) | |
ds)) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; Dataflow | |
(defstruct cc:dataflow key value deferred-list) | |
(defun cc:dataflow-environment (&optional parent-env test-func) | |
(list parent-env | |
(or test-func 'equal))) | |
(defmacro cc:dataflow-parent-environment (df) | |
`(car ,df)) | |
(defmacro cc:dataflow-test (df) | |
`(cadr ,df)) | |
(defmacro cc:dataflow-list (df) | |
`(cddr ,df)) | |
(defun cc:dataflow-get-object (df key) | |
(or | |
(loop for i in (cc:dataflow-list df) | |
with test = (cc:dataflow-test df) | |
if (funcall test key (cc:dataflow-key i)) | |
return i) | |
(deferred:aand | |
(cc:dataflow-parent-environment df) | |
(cc:dataflow-get-object it key)))) | |
(defun cc:dataflow-get (df key) | |
(let ((obj (cc:dataflow-get-object df key))) | |
(cond | |
((and obj (cc:dataflow-value obj)) | |
(deferred:succeed (cc:dataflow-value obj))) | |
(t | |
(unless obj | |
(setq obj (make-cc:dataflow :key key)) | |
(push obj (cc:dataflow-list df))) | |
(let ((d (deferred:new))) | |
(push d (cc:dataflow-deferred-list obj)) | |
d))))) | |
(defun cc:dataflow-get-sync (df key) | |
(let ((obj (cc:dataflow-get-object df key))) | |
(and obj (cc:dataflow-value obj)))) | |
(defun cc:dataflow-set (df key value) | |
(let ((obj (cc:dataflow-get-object df key))) | |
(cond | |
((and obj (cc:dataflow-value obj)) | |
;; overwrite! | |
(error "Can not set a dataflow value. The key [%s] has already had a value. NEW:[%s] OLD:[%s]" key value (cc:dataflow-value obj))) | |
(obj | |
;; value arrived and start deferred objects | |
(loop for i in (cc:dataflow-deferred-list obj) | |
do (deferred:callback-post i value)) | |
(setf (cc:dataflow-value obj) value) | |
(setf (cc:dataflow-deferred-list obj) nil)) | |
(t | |
;; just value arrived | |
(push (make-cc:dataflow :key key :value value) | |
(cc:dataflow-list df)))) | |
value)) | |
(defun cc:dataflow-clear (df key) | |
(setf (cc:dataflow-list df) | |
(loop for i in (cc:dataflow-list df) | |
with test = (cc:dataflow-test df) | |
unless (funcall test key (cc:dataflow-key i)) | |
collect i))) | |
(defun cc:dataflow-get-avalable-pairs (df) | |
(append | |
(loop for i in (cc:dataflow-list df) | |
for key = (cc:dataflow-key i) | |
for val = (cc:dataflow-value i) | |
if val collect (cons key val)) | |
(deferred:aand | |
(cc:dataflow-parent-environment df) | |
(cc:dataflow-get-avalable-pairs it)))) | |
' | |
(defun cc:dataflow-get-waiting-keys (df) | |
(append | |
(loop for i in (cc:dataflow-list df) | |
for key = (cc:dataflow-key i) | |
for val = (cc:dataflow-value i) | |
unless val collect key) | |
(deferred:aand | |
(cc:dataflow-parent-environment df) | |
(cc:dataflow-get-waiting-keys it)))) | |
(defun cc:dataflow-clear-all (df) | |
(setf (cc:dataflow-list df) nil)) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; Signal / Connect | |
(defun cc:signal-channel (&optional name parent-channel) | |
(lexical-let | |
((ch (cons | |
(or name (deferred:uid)) ; name for debug | |
(cons | |
parent-channel ; parent-channel | |
nil)))) ; observers | |
(when parent-channel | |
(cc:signal-connect | |
parent-channel | |
t (lambda (event) | |
(destructuring-bind | |
(event-name event-args) event | |
(cc:signal-send | |
ch event-name event-args))))) | |
ch)) | |
(defmacro cc:signal-name (ch) | |
`(car ,ch)) | |
(defmacro cc:signal-parent-channel (ch) | |
`(cadr ,ch)) | |
(defmacro cc:signal-observers (ch) | |
`(cddr ,ch)) | |
(defun cc:signal-connect (channel event-sym &optional callback) | |
(let ((d (if callback | |
(deferred:new callback) | |
(deferred:new)))) | |
(push (cons event-sym d) | |
(cc:signal-observers channel)) | |
d)) | |
(defun cc:signal-send (channel event-sym &rest args) | |
(let ((observers (cc:signal-observers channel)) | |
(event (list event-sym args))) | |
(loop for i in observers | |
for name = (car i) | |
for d = (cdr i) | |
if (or (eq event-sym name) (eq t name)) | |
do (deferred:callback-post d event)))) | |
(defun cc:signal-send-global (channel event-sym &rest args) | |
(cc:aif (cc:signal-parent-channel channel) | |
(apply 'cc:signal-global it event-sym args) | |
(apply 'cc:signal-send channel event-sym args))) | |
(defun cc:signal-disconnect (channel deferred) | |
(let ((observers (cc:signal-observers channel)) deleted) | |
(setf | |
(cc:signal-observers channel) ; place | |
(loop for i in observers | |
for d = (cdr i) | |
unless (eq d deferred) | |
collect i | |
else | |
do (push i deleted))) | |
deleted)) | |
(defun cc:signal-disconnect-all (channel) | |
(setf | |
(cc:signal-observers channel) ; place | |
nil)) | |
(provide 'concurrent) | |
;;; concurrent.el ends here |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment