Created
September 3, 2014 15:59
-
-
Save viperscape/f0e13ab6227e8fe1de4d to your computer and use it in GitHub Desktop.
async chans and kuroshio streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns chan-test.core | |
(:gen-class) | |
(:require [clojure.core.async :as async] | |
[kuroshio.core :as k])) | |
(defn create-string [] | |
(apply str (take 10000 (repeat ".\r\n")))) | |
(defn println-err [& args] | |
(binding [*out* *err*] | |
(apply println args))) | |
(defn process | |
"Do 'work'" | |
[line] | |
(Thread/sleep 10) | |
line) | |
(defn reader-seq [s] | |
(line-seq (java.io.BufferedReader. (java.io.StringReader. s)))) | |
(def in-chan (async/chan)) | |
(def out-chan (async/chan)) | |
(def wait-chan (async/chan)) | |
(defn start-async-consumers | |
"Start num-consumers threads that will consume work | |
from the in-chan and put it into the out-chan." | |
[num-consumers] | |
(dotimes [_ num-consumers] | |
(async/thread | |
(loop [] | |
(let [d (async/<!! in-chan)] | |
(when-not (= d ::quit) | |
(async/>!! out-chan (process d)) | |
(recur))))))) | |
(defn start-async-aggregator | |
"Take items from the out-chan" | |
[] | |
(async/thread | |
(dotimes [n 10000] | |
(async/<!! out-chan)) | |
(async/>!! wait-chan ::done))) | |
(defn async-test [] | |
(start-async-consumers 8) | |
(start-async-aggregator) | |
(doseq [line (reader-seq (create-string))] | |
(async/>!! in-chan line)) | |
(dotimes [n 8] | |
(async/>!! in-chan ::quit)) ;;shutdown workers | |
(async/<!! wait-chan)) | |
(defn sync-test [] | |
(doseq [line (reader-seq (create-string))] | |
(process line))) | |
(defn stream-test [] | |
(let [work (k/new-stream) | |
results (k/new-stream)] | |
(dotimes [n 8] ;;setup workers | |
(future (loop [] | |
(let [d (k/take! work)] | |
(when-not (= d ::quit) | |
(k/put! results (process d)) | |
(recur)))))) | |
(doseq [line (reader-seq (create-string))] | |
(k/put! work line)) | |
(dotimes [n 8] | |
(k/put! work ::quit)) ;;shutdown workers | |
(dotimes [n 10000] | |
(k/take! results)))) | |
(defn -main[& args] | |
;(time (sync-test)) ;;always 10x10000 | |
(print "async-test ") (time (async-test)) ;; 12.5 sec | |
(print "stream-test ") (time (stream-test))) ;; 12.5 sec |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defproject chan-test "0.1.0-SNAPSHOT" | |
:description "FIXME: write description" | |
:url "http://example.com/FIXME" | |
:license {:name "Eclipse Public License" | |
:url "http://www.eclipse.org/legal/epl-v10.html"} | |
:dependencies [[org.clojure/clojure "1.7.0-alpha1"] | |
[org.clojure/core.async "0.1.338.0-5c5012-alpha"] | |
[kuroshio "0.2.3-SNAPSHOT"]] | |
:main ^:skip-aot chan-test.core | |
:target-path "target/%s" | |
:profiles {:uberjar {:aot :all}}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment