Created
April 22, 2015 06:36
-
-
Save mopemope/a0e9878a4ef9a2ea2310 to your computer and use it in GitHub Desktop.
ppool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defmodule ppool-nagger | |
(export all) | |
(behavior gen_server)) | |
(defun start-link (task delay max send-to) | |
(gen_server:start_link (MODULE) `#(,task ,delay ,max ,send-to) '())) | |
(defun start_link (task delay max send-to) | |
(start-link task delay max send-to)) | |
(defun stop (pid) | |
(gen_server:call pid 'stop)) | |
(defun init | |
((`#(,task ,delay ,max ,send-to)) | |
`#(ok #(,task ,delay ,max ,send-to) ,delay))) | |
(defun handle_call | |
(('stop _from state) | |
`#(stop normal ok ,state)) | |
((_msg _from state) | |
`#(noreply ,state))) | |
(defun handle_cast (_msg state) | |
`#(noreply ,state)) | |
(defun handle_info | |
(('timeout `#(,task ,delay ,max ,send-to)) | |
(! send-to `#(,(self) ,task)) | |
(cond ((=:= max 'infinity) | |
`#(noreply #(,task ,delay ,max ,send-to) ,delay)) | |
((=< max 1) | |
`#(stop normal #(,task ,delay 0 ,send-to))) | |
((> max 1) | |
`#(noreply #(,task ,delay ,(- max 1) ,send-to) ,delay ))))) | |
(defun code_change (_oldvsn state _extra) | |
`#(ok ,state)) | |
(defun terminate (_reason _state) | |
'ok) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defmodule ppool-serv | |
(export all) | |
(behavior gen_server)) | |
(defun start | |
((name limit sup mfa) (when (and (is_atom name) (is_integer limit))) | |
(gen_server:start `#(local ,name) (MODULE) `#(,limit ,mfa ,sup) '()))) | |
(defun start-link | |
((name limit sup mfa) (when (and (is_atom name) (is_integer limit))) | |
(gen_server:start_link `#(local ,name) (MODULE) `#(,limit ,mfa ,sup) '()))) | |
(defun start_link (name limit sup mfa) | |
(start-link name limit sup mfa)) | |
(defun run (name args) | |
(gen_server:call name `#(run ,args))) | |
(defun sync-queue (name args) | |
(gen_server:call name `#(sync ,args) 'infinity)) | |
(defun sync_queue (name args) | |
(sync-queue name args)) | |
(defun async-queue (name args) | |
(gen_server:cast name `#(async ,args))) | |
(defun async_queue (name args) | |
(async-queue name args)) | |
(defun stop (name) | |
(gen_server:call name 'stop)) | |
(defun spec (mfa) | |
`#(worker_sup | |
#(ppool-worker-sup start_link (,mfa)) | |
temporary 10000 supervisor (ppool-worker-sup))) | |
(defrecord state | |
(limit 0) | |
sup | |
refs | |
(queue (queue:new))) | |
;; gen_server | |
;; gen_server:init | |
(defun init | |
((`#(,limit ,mfa ,sup)) | |
(! (self) `#(start_worker_supervisor ,sup ,mfa)) | |
`#(ok ,(make-state limit limit refs (gb_sets:empty))))) | |
;; gen_server:handle_info/2 | |
(defun handle_info | |
((`#(DOWN ,ref process ,_pid ,_) (= (match-state refs refs) s)) | |
(io:format "received down msg ~n") | |
(case (gb_sets:is_element ref refs) | |
('true (handle-down-worker ref s)) | |
('false `#(noreply ,s)))) | |
((`#(start_worker_supervisor ,sup ,mfa) (= (match-state) s)) | |
(let ((`#(ok ,pid) (supervisor:start_child sup (spec mfa)))) | |
(link pid) | |
`#(noreply ,(set-state s sup pid)))) | |
((msg state) | |
(io:format "Unknwon msg : ~p~n" `(,msg)) | |
`#(noreply ,state))) | |
;; gen_server:handle_call/3 | |
(defun handle_call | |
;; run | |
((`#(run ,args) _from (= (match-state limit n sup sup refs r) s)) (when (> n 0)) | |
(let* ((`#(ok ,pid) (supervisor:start_child sup args)) | |
(ref (erlang:monitor 'process pid))) | |
`#(reply #(ok ,pid) ,(set-state s limit (- n 1) refs (gb_sets:add ref r))))) | |
((`#(run ,args) _from (= (match-state limit n) s)) (when (=< n 0)) | |
`#(reply noalloc ,s)) | |
;; sync | |
((`#(sync ,args) _from (= (match-state limit n sup sup refs r) s)) (when (> n 0)) | |
(let* ((`#(ok ,pid) (supervisor:start_child sup args)) | |
(ref (erlang:monitor 'process pid))) | |
`#(reply #(ok ,pid) ,(set-state s limit (- n 1) refs (gb_sets:add ref r))))) | |
((`#(sync ,args) from (= (make-state queue q) s)) | |
`#(noreply ,(set-state s queue (queue:in `#(,from ,args) q)))) | |
;; stop | |
(('stop _frome state) | |
`#(stop normal ok ,state)) | |
((_msg _from state) | |
`#(noreply ,state))) | |
;; gen_server:handle_cast/2 | |
(defun handle_cast | |
((`#(async ,args) (= (match-state limit n sup sup refs r) s)) (when (> n 0)) | |
(let* ((`#(ok ,pid) (supervisor:start_child sup args)) | |
(ref (erlang:monitor 'process pid))) | |
`#(noreply ,(set-state s limit (- n 1) refs (gb_sets:add ref r))))) | |
((`#(async ,args) (= (match-state limit n queue q) s)) (when (=< n 0)) | |
`#(noreply ,(set-state s queue (queue:in args q)))) | |
((_msg state) | |
`#(noreply ,state))) | |
(defun code_change (_oldvsn state extra) | |
`#(ok ,state)) | |
(defun terminate (_reason _state) | |
'ok) | |
;; handle-down-worker/2 | |
(defun handle-down-worker | |
((ref (= (match-state limit l sup sup refs refs) s)) | |
(case (queue:out (state-queue s)) | |
(`#(#(value #(,from ,args)) ,q) | |
(let* ((`#(ok ,pid) (supervisor:start_child sup args)) | |
(new-ref (erlang:monitor 'process pid)) | |
(new-refs (gb_sets:insert new-ref (gb_sets:delete ref refs)))) | |
(gen_server:reply from `#(ok ,pid)) | |
`#(noreply ,(set-state s refs new-refs queue q)))) | |
(`#(#(value ,args) ,q) | |
(let* ((`#(ok ,pid) (supervisor:start_child sup args)) | |
(new-ref (erlang:monitor 'process pid)) | |
(new-refs (gb_sets:insert new-ref (gb_sets:delete ref refs)))) | |
`#(noreply ,(set-state s refs new-refs queue q)))) | |
(`#(empty ,_q) | |
`#(noreply ,(set-state s limit (+ l 1) refs (gb_sets:delete ref refs))))))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defmodule ppool-sup | |
(export all) | |
(behavior supervisor)) | |
(defun start-link (name limit mfa) | |
(supervisor:start_link (MODULE) `#(,name ,limit ,mfa))) | |
(defun start_link (name limit mfa) | |
(start-link name limit mfa)) | |
(defun init | |
((`#(,name ,limit ,mfa)) | |
(let ((max-restart 1) | |
(max-time 3600)) | |
`#(ok #(#(one_for_all ,max-restart ,max-time) | |
(#(serv | |
#(ppool-serv start_link (,name ,limit ,(self) ,mfa)) | |
permanent 5000 worker (ppool-serv)))))))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defmodule ppool-supersup | |
(export all) | |
(behavior supervisor)) | |
(defun start-link () | |
(supervisor:start_link `#(local ppool) (MODULE) '())) | |
(defun start_link () | |
(start-link)) | |
(defun stop () | |
(case (whereis 'ppool) | |
(p (when (is_pid p)) | |
(exit p 'kill)) | |
(_ 'ok))) | |
(defun init | |
(('()) | |
#(ok #(#(one_for_all 6 3600) ())))) | |
(defun start-pool (name limit mfa) | |
(let ((child-spec `#(,name | |
#(ppool-sup start_link (,name ,limit ,mfa)) | |
permanent 10500 supervisor (ppool-sup)))) | |
(supervisor:start_child 'ppool child-spec))) | |
(defun start_pool (name limit mfa) | |
(start-pool name limit mfa)) | |
(defun stop-pool (name) | |
(supervisor:terminate_child 'ppool name) | |
(supervisor:delete_child 'ppool name)) | |
(defun stop_pool (name) | |
(stop-pool name)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defmodule ppool-worker-sup | |
(export all) | |
(behavior supervisor)) | |
(defun start-link | |
(((= `#(,_ ,_ ,_) mfa)) | |
(supervisor:start_link (MODULE) mfa))) | |
(defun start_link (mfa) | |
(start-link mfa)) | |
(defun init | |
((`#(,m ,f ,a)) | |
`#(ok #(#(simple_one_for_one 5 3600) | |
(#(ppool-worker | |
#(,m ,f ,a) | |
temporary 5000 worker (,m))))))) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defmodule ppool | |
(export all)) | |
(defun start-link () | |
(ppool-supersup:start-link)) | |
(defun start_link () | |
(start-link)) | |
(defun stop () | |
(ppool-supersup:stop)) | |
(defun start-pool | |
((name limit `#(,m ,f ,a)) | |
(ppool-supersup:start-pool name limit `#(,m ,f ,a)))) | |
(defun start_pool (name limit mfa) | |
(start-pool name limit mfa)) | |
(defun stop-pool (name) | |
(ppool-supersup:stop-pool name)) | |
(defun stop_pool (name) | |
(stop-pool name)) | |
(defun run (name args) | |
(ppool-serv:run name args)) | |
(defun async-queue (name args) | |
(ppool-serv:async-queue name args)) | |
(defun async_queue (name args) | |
(async-queue name args)) | |
(defun sync-queue (name args) | |
(ppool-serv:sync-queue name args)) | |
(defun sync_queue (name args) | |
(sync-queue name args)) | |
(defun test () | |
(ppool:start-link) | |
(ppool:start-pool 'nagger 2 #(ppool-nagger start_link ())) | |
(ppool:run 'nagger `("finish the chapter!" 5000 2 ,(self))) | |
(ppool:run 'nagger `("Watch a good movie" 5000 2 ,(self))) | |
(ppool:run 'nagger `("clean up bit a" 5000 2 ,(self))) | |
(timer:sleep 10000) | |
(c:flush) | |
(ppool:async-queue 'nagger `("Pay the bills" 2000 1 ,(self))) | |
(ppool:async-queue 'nagger `("Take a shower" 2000 1 ,(self))) | |
(timer:sleep 10000) | |
(c:flush) | |
(ppool:sync-queue 'nagger `("Pet a dog" 2000 1 ,(self))) | |
(ppool:sync-queue 'nagger `("Make some noise" 2000 1 ,(self))) | |
(timer:sleep 10000) | |
(c:flush) | |
(ppool:stop-pool 'nagger) | |
(ppool:stop) | |
(c:flush) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment