Last active
December 18, 2015 01:49
-
-
Save nfisher/5706682 to your computer and use it in GitHub Desktop.
pipejine shutdown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns pipedream.core | |
(:gen-class) | |
(:require [clojure.tools.logging :as log] | |
[pipejine.core :as pipe])) | |
(defn pipeline [] | |
(let [q1 (pipe/new-queue {:name "q1" | |
:queue-size 5 | |
:number-of-consumer-threads 5 | |
:number-of-producers 1}) | |
q2 (pipe/new-queue {:name "q2" | |
:queue-size 2 | |
:number-of-consumer-threads 1 | |
:number-of-producers 1 | |
:partition 2}) ;; partition queues should only have one thread! | |
q3 (pipe/new-queue {:name "q3" | |
:queue-size 3 | |
:number-of-consumer-threads 3 | |
:number-of-producers 1}) | |
q4 (pipe/new-queue {:name "q4" | |
:queue-size 10 | |
:number-of-consumer-threads 1 | |
:number-of-producers 2 | |
:partition :all | |
:debug true}) | |
logger (pipe/spawn-logger q1 q2 q3 q4)] | |
(pipe/spawn-consumers q1 #(do | |
(pipe/produce q2 (inc %)) ;; q1 workers puts stuff on q2 | |
(pipe/produce q3 (dec %)) ;; .. and q3 | |
)) | |
(pipe/spawn-consumers q2 #(do | |
(log/info "q2 got: " %) | |
(doseq [d %] | |
(pipe/produce q4 (* d d))))) | |
(pipe/spawn-consumers q3 #(do | |
(Thread/sleep 10) | |
(pipe/produce q4 (/ % 2)))) | |
(pipe/producer-of q1 q2 q3) | |
(pipe/producer-of q2 q4) | |
(pipe/producer-of q3 q4) | |
(pipe/spawn-supervisor q4 #(log/info "pipeline exhausted!")) | |
;; example of read-seq, could just as well be another consumer (as above) | |
(future (log/info "***" (first (pipe/read-seq q4)))) | |
(dotimes [i 20] ;; Seed q1 with data | |
(Thread/sleep 10) | |
(log/info "prod: " i) | |
(pipe/produce q1 i)) | |
(pipe/produce-done q1) ;; Mark that we're done putting data in q1 | |
;; wait for q4 to finish...not sure if this is the best way | |
(.await (:producers-done q4)) | |
;; (logger) | |
)) | |
(defn -main | |
"I don't do a whole lot ... yet." | |
[& args] | |
;; expected that pipeline stops after all queues are shutdown | |
(pipeline) | |
(shutdown-agents) | |
(println "Hello, World!")) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defproject pipedream "0.1.0-SNAPSHOT" | |
:description "FIXME: write description" | |
:url "http://example.com/FIXME" | |
:license {:name "Eclipse Public License" | |
:url "http://www.eclipse.org/legal/epl-v10.html"} | |
:dependencies [[org.clojure/clojure "1.5.1"] | |
[pipejine "0.1.1"]] | |
:main pipedream.core) |
Also, add (shutdown-agents) as the final thing in -main
The updated works as you suggested. Thanks! :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
There's no requirement to shutdown queues manually, all workers stop when all it's producers are done. The supervisor also finishes when the queue shuts down.
In this case you shutdown the queues before the calculation finishes, since q4 is batching you'll not see the final result. Try to add a (Thread/sleep 1000) before the shutdowns to notice the calculation finish.