core async pool, go block vs threads, etc
'[clojure.core.async :as async]
'[clojure.core.async.impl.protocols :as protocols]
'[clojure.core.async.impl.concurrent :as conc]
'[clojure.core.async.impl.exec.threadpool :as tp])
(import 'java.util.concurrent.Executors)
(def executor-svc (Executors/newFixedThreadPool
(conc/counted-thread-factory "my-async-dispatch-%d" true)))
(def my-executor
(reify protocols/Executor
(protocols/exec [this r]
(.execute executor-svc ^Runnable r))))
(alter-var-root #'clojure.core.async.impl.dispatch/executor
(constantly (delay my-executor)))
(defn- thread-id []
(hash (Thread/currentThread)))
(defn net-call [& args]
(println "Go Thread" (thread-id) "Starting" (first args))
(println "Pool size:" (.getPoolSize executor-svc) "Queue size:" (.. executor-svc getQueue size))
(println "Go Thread" (thread-id) "Ending" (first args))
(slurp "http://localhost:8000/ping") ;; Network IO with a slow server
;; (throw (ex-info "error from go" {})) ;; uncomment to test exception in go block
(run! async/<!!
#(async/go (net-call %))
(range 30))))
(println "Thread" (thread-id) "Starting" %)
(println "Thread" (thread-id) "Ending" %))
(range 30))))
(let [orgs-channel (async/to-chan! (range 30))
response-channel (async/chan (async/dropping-buffer 10))]
(async/pipeline-async 1 response-channel net-call orgs-channel)
(async/<!! (async/go-loop [response (async/<! response-channel)]
(if response
(recur (async/<! response-channel))
{:message "complete"})))))
