Created
November 11, 2022 08:08
-
-
Save niwinz/2c08cc0366c943207e9ae0e70ac52cf3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns user | |
(:require | |
[clojure.tools.namespace.repl :as r] | |
[clojure.core.async :as a] | |
[promesa.core :as p] | |
[promesa.exec :as px] | |
[promesa.exec.csp :as sp] | |
[promesa.protocols :as pt] | |
[promesa.util :as pu]) | |
(:import | |
java.util.concurrent.CompletableFuture | |
java.util.concurrent.CompletionStage | |
java.util.function.Function | |
java.util.concurrent.atomic.AtomicLong)) | |
(defn dbg | |
[& params] | |
#_(locking dbg | |
(apply prn params))) | |
(defn timestamp | |
[] | |
(System/currentTimeMillis)) | |
(defn start-test-promesa | |
[nfabs nprod ncons ms] | |
(dbg "start test" nfabs nprod ncons) | |
(let [msg-counter (AtomicLong. 0) | |
proc-counter (AtomicLong. 0) | |
delay-sum (AtomicLong. 0)] | |
(letfn [(start-consumer! [fid cid close-ch in-ch] | |
(dbg "start consumer" (str "id:" fid "-" cid)) | |
(.incrementAndGet ^AtomicLong proc-counter) | |
(sp/go-loop [] | |
(let [[val port] (sp/alts! [close-ch in-ch])] | |
(when (some? val) | |
(let [elapsed (- (timestamp) val)] | |
(dbg "consumer" | |
(str "id:" fid "-" cid) | |
(str "elapsed:" elapsed "ms")) | |
(.addAndGet ^AtomicLong delay-sum ^long elapsed) | |
(.incrementAndGet ^AtomicLong msg-counter) | |
(sp/<! (sp/timeout-chan (rand-int 300))) | |
(recur)))))) | |
(start-producer! [fid pid close-ch in-ch] | |
(dbg "start producer" (str "id:" fid "-" pid)) | |
(.incrementAndGet ^AtomicLong proc-counter) | |
(sp/go-loop [] | |
(let [[val port] (sp/alts! [[in-ch (timestamp)] close-ch])] | |
(when (and val (= port in-ch)) | |
(dbg "producer" (str "id:" fid "-" pid)) | |
(sp/<! (sp/timeout-chan (rand-int 1000))) | |
(recur))))) | |
(start-fabric! [close-ch fid] | |
(let [in-ch (sp/chan) | |
mk-consumer #(start-consumer! fid % close-ch in-ch) | |
mk-producer #(start-producer! fid % close-ch in-ch)] | |
(-> #{} | |
(into (map mk-consumer) (range ncons)) | |
(into (map mk-producer) (range nprod)))))] | |
(let [close-ch (sp/chan) | |
all (into #{} (mapcat (partial start-fabric! close-ch)) (range nfabs)) | |
t1 (timestamp)] | |
;; (dbg "start stoper") | |
(sp/sleep ms) | |
;; (dbg "ended stoper") | |
(sp/close! close-ch) | |
(run! deref all) | |
(let [total-messages (long msg-counter) | |
total-delay (long delay-sum) | |
total-procs (long proc-counter) | |
total-time (long (/ (- (timestamp) t1) 1000))] | |
(prn "finished" (str "procs/count:" total-procs)) | |
(prn "finished" (str "messages/count:" total-messages)) | |
(prn "finished" (str "messages/delay-sum:" total-delay "ms")) | |
(prn "finished" (str "messages/total-time:" total-time "s")) | |
(prn "finished" (str "messages/avg:" (long (/ total-delay total-messages)) "ms")) | |
(prn "finished" (str "messages/rate:" (long (/ total-messages total-time)) "msg/s"))) | |
)))) | |
(defn start-test-core-async | |
[nfabs nprod ncons ms] | |
(dbg "start test" nfabs nprod ncons) | |
(let [msg-counter (AtomicLong. 0) | |
proc-counter (AtomicLong. 0) | |
delay-sum (AtomicLong. 0)] | |
(letfn [(start-consumer! [fid cid close-ch in-ch] | |
(dbg "start consumer" (str "id:" fid "-" cid)) | |
(.incrementAndGet ^AtomicLong proc-counter) | |
(a/go-loop [] | |
(let [[val port] (a/alts! [close-ch in-ch])] | |
(when (some? val) | |
(let [elapsed (- (timestamp) val)] | |
(dbg "consumer" (str "id:" fid "-" cid) (str "elapsed:" elapsed "ms")) | |
(.addAndGet ^AtomicLong delay-sum ^long elapsed) | |
(.incrementAndGet ^AtomicLong msg-counter) | |
(a/<! (a/timeout (rand-int 300))) | |
(recur)))))) | |
(start-producer! [fid pid close-ch in-ch] | |
(dbg "start producer" (str "id:" fid "-" pid)) | |
(.incrementAndGet ^AtomicLong proc-counter) | |
(a/go-loop [] | |
(let [[val port] (a/alts! [[in-ch (timestamp)] close-ch])] | |
(when (and val (= port in-ch)) | |
(dbg "producer" (str "id:" fid "-" pid)) | |
(a/<! (a/timeout (rand-int 1000))) | |
(recur))))) | |
(start-fabric! [close-ch fid] | |
(let [in-ch (a/chan) | |
mk-consumer #(start-consumer! fid % close-ch in-ch) | |
mk-producer #(start-producer! fid % close-ch in-ch)] | |
(-> #{} | |
(into (map mk-consumer) (range ncons)) | |
(into (map mk-producer) (range nprod)))))] | |
(let [close-ch (a/chan) | |
all (into #{} (mapcat (partial start-fabric! close-ch)) (range nfabs)) | |
t1 (timestamp)] | |
;; (dbg "start stoper") | |
(sp/sleep ms) | |
;; (dbg "ended stoper") | |
(a/close! close-ch) | |
(run! a/<!! all) | |
(let [total-messages (long msg-counter) | |
total-delay (long delay-sum) | |
total-procs (long proc-counter) | |
total-time (long (/ (- (timestamp) t1) 1000))] | |
(prn "finished" (str "procs/count:" total-procs)) | |
(prn "finished" (str "messages/count:" total-messages)) | |
(prn "finished" (str "messages/delay-sum:" total-delay "ms")) | |
(prn "finished" (str "messages/total-time:" total-time "s")) | |
(prn "finished" (str "messages/avg:" (long (/ total-delay total-messages)) "ms")) | |
(prn "finished" (str "messages/rate:" (long (/ total-messages total-time)) "msg/s"))) | |
)))) | |
;; user=> (start-test-core-async 100 5 5 20000) | |
;; "finished" "procs/count:1000" | |
;; "finished" "messages/count:20377" | |
;; "finished" "messages/delay-sum:23900ms" | |
;; "finished" "messages/total-time:20s" | |
;; "finished" "messages/avg:1ms" | |
;; "finished" "messages/rate:1018msg/s" | |
;; nil | |
;; user=> (start-test-promesa 100 5 5 20000) | |
;; "finished" "procs/count:1000" | |
;; "finished" "messages/count:20389" | |
;; "finished" "messages/delay-sum:15111ms" | |
;; "finished" "messages/total-time:20s" | |
;; "finished" "messages/avg:0ms" | |
;; "finished" "messages/rate:1019msg/s" | |
;; nil | |
;; Since core.async chanels only accept 1024 pending handlers | |
;; this test can't be run on core.async | |
;; user=> (start-test-promesa 500 5 5 20000) | |
;; "finished" "procs/count:5000" | |
;; "finished" "messages/count:84533" | |
;; "finished" "messages/delay-sum:4387559ms" | |
;; "finished" "messages/total-time:21s" | |
;; "finished" "messages/avg:51ms" | |
;; "finished" "messages/rate:4025msg/s" | |
;; nil | |
;; user=> (start-test-promesa 1500 5 5 20000) | |
;; "finished" "procs/count:15000" | |
;; "finished" "messages/count:82955" | |
;; "finished" "messages/delay-sum:83177574ms" | |
;; "finished" "messages/total-time:22s" | |
;; "finished" "messages/avg:1002ms" | |
;; "finished" "messages/rate:3770msg/s" | |
;; nil | |
;; user=> (start-test-promesa 10000 5 5 20000) | |
;; "finished" "procs/count:100000" | |
;; "finished" "messages/count:70153" | |
;; "finished" "messages/delay-sum:888977268ms" | |
;; "finished" "messages/total-time:37s" | |
;; "finished" "messages/avg:12671ms" | |
;; "finished" "messages/rate:1896msg/s" | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment