Skip to content

Instantly share code, notes, and snippets.

@niwinz
Created November 11, 2022 08:08
Show Gist options
  • Save niwinz/2c08cc0366c943207e9ae0e70ac52cf3 to your computer and use it in GitHub Desktop.
Save niwinz/2c08cc0366c943207e9ae0e70ac52cf3 to your computer and use it in GitHub Desktop.
(ns user
(:require
[clojure.tools.namespace.repl :as r]
[clojure.core.async :as a]
[promesa.core :as p]
[promesa.exec :as px]
[promesa.exec.csp :as sp]
[promesa.protocols :as pt]
[promesa.util :as pu])
(:import
java.util.concurrent.CompletableFuture
java.util.concurrent.CompletionStage
java.util.function.Function
java.util.concurrent.atomic.AtomicLong))
(defn dbg
[& params]
#_(locking dbg
(apply prn params)))
(defn timestamp
[]
(System/currentTimeMillis))
(defn start-test-promesa
[nfabs nprod ncons ms]
(dbg "start test" nfabs nprod ncons)
(let [msg-counter (AtomicLong. 0)
proc-counter (AtomicLong. 0)
delay-sum (AtomicLong. 0)]
(letfn [(start-consumer! [fid cid close-ch in-ch]
(dbg "start consumer" (str "id:" fid "-" cid))
(.incrementAndGet ^AtomicLong proc-counter)
(sp/go-loop []
(let [[val port] (sp/alts! [close-ch in-ch])]
(when (some? val)
(let [elapsed (- (timestamp) val)]
(dbg "consumer"
(str "id:" fid "-" cid)
(str "elapsed:" elapsed "ms"))
(.addAndGet ^AtomicLong delay-sum ^long elapsed)
(.incrementAndGet ^AtomicLong msg-counter)
(sp/<! (sp/timeout-chan (rand-int 300)))
(recur))))))
(start-producer! [fid pid close-ch in-ch]
(dbg "start producer" (str "id:" fid "-" pid))
(.incrementAndGet ^AtomicLong proc-counter)
(sp/go-loop []
(let [[val port] (sp/alts! [[in-ch (timestamp)] close-ch])]
(when (and val (= port in-ch))
(dbg "producer" (str "id:" fid "-" pid))
(sp/<! (sp/timeout-chan (rand-int 1000)))
(recur)))))
(start-fabric! [close-ch fid]
(let [in-ch (sp/chan)
mk-consumer #(start-consumer! fid % close-ch in-ch)
mk-producer #(start-producer! fid % close-ch in-ch)]
(-> #{}
(into (map mk-consumer) (range ncons))
(into (map mk-producer) (range nprod)))))]
(let [close-ch (sp/chan)
all (into #{} (mapcat (partial start-fabric! close-ch)) (range nfabs))
t1 (timestamp)]
;; (dbg "start stoper")
(sp/sleep ms)
;; (dbg "ended stoper")
(sp/close! close-ch)
(run! deref all)
(let [total-messages (long msg-counter)
total-delay (long delay-sum)
total-procs (long proc-counter)
total-time (long (/ (- (timestamp) t1) 1000))]
(prn "finished" (str "procs/count:" total-procs))
(prn "finished" (str "messages/count:" total-messages))
(prn "finished" (str "messages/delay-sum:" total-delay "ms"))
(prn "finished" (str "messages/total-time:" total-time "s"))
(prn "finished" (str "messages/avg:" (long (/ total-delay total-messages)) "ms"))
(prn "finished" (str "messages/rate:" (long (/ total-messages total-time)) "msg/s")))
))))
(defn start-test-core-async
[nfabs nprod ncons ms]
(dbg "start test" nfabs nprod ncons)
(let [msg-counter (AtomicLong. 0)
proc-counter (AtomicLong. 0)
delay-sum (AtomicLong. 0)]
(letfn [(start-consumer! [fid cid close-ch in-ch]
(dbg "start consumer" (str "id:" fid "-" cid))
(.incrementAndGet ^AtomicLong proc-counter)
(a/go-loop []
(let [[val port] (a/alts! [close-ch in-ch])]
(when (some? val)
(let [elapsed (- (timestamp) val)]
(dbg "consumer" (str "id:" fid "-" cid) (str "elapsed:" elapsed "ms"))
(.addAndGet ^AtomicLong delay-sum ^long elapsed)
(.incrementAndGet ^AtomicLong msg-counter)
(a/<! (a/timeout (rand-int 300)))
(recur))))))
(start-producer! [fid pid close-ch in-ch]
(dbg "start producer" (str "id:" fid "-" pid))
(.incrementAndGet ^AtomicLong proc-counter)
(a/go-loop []
(let [[val port] (a/alts! [[in-ch (timestamp)] close-ch])]
(when (and val (= port in-ch))
(dbg "producer" (str "id:" fid "-" pid))
(a/<! (a/timeout (rand-int 1000)))
(recur)))))
(start-fabric! [close-ch fid]
(let [in-ch (a/chan)
mk-consumer #(start-consumer! fid % close-ch in-ch)
mk-producer #(start-producer! fid % close-ch in-ch)]
(-> #{}
(into (map mk-consumer) (range ncons))
(into (map mk-producer) (range nprod)))))]
(let [close-ch (a/chan)
all (into #{} (mapcat (partial start-fabric! close-ch)) (range nfabs))
t1 (timestamp)]
;; (dbg "start stoper")
(sp/sleep ms)
;; (dbg "ended stoper")
(a/close! close-ch)
(run! a/<!! all)
(let [total-messages (long msg-counter)
total-delay (long delay-sum)
total-procs (long proc-counter)
total-time (long (/ (- (timestamp) t1) 1000))]
(prn "finished" (str "procs/count:" total-procs))
(prn "finished" (str "messages/count:" total-messages))
(prn "finished" (str "messages/delay-sum:" total-delay "ms"))
(prn "finished" (str "messages/total-time:" total-time "s"))
(prn "finished" (str "messages/avg:" (long (/ total-delay total-messages)) "ms"))
(prn "finished" (str "messages/rate:" (long (/ total-messages total-time)) "msg/s")))
))))
;; user=> (start-test-core-async 100 5 5 20000)
;; "finished" "procs/count:1000"
;; "finished" "messages/count:20377"
;; "finished" "messages/delay-sum:23900ms"
;; "finished" "messages/total-time:20s"
;; "finished" "messages/avg:1ms"
;; "finished" "messages/rate:1018msg/s"
;; nil
;; user=> (start-test-promesa 100 5 5 20000)
;; "finished" "procs/count:1000"
;; "finished" "messages/count:20389"
;; "finished" "messages/delay-sum:15111ms"
;; "finished" "messages/total-time:20s"
;; "finished" "messages/avg:0ms"
;; "finished" "messages/rate:1019msg/s"
;; nil
;; Since core.async chanels only accept 1024 pending handlers
;; this test can't be run on core.async
;; user=> (start-test-promesa 500 5 5 20000)
;; "finished" "procs/count:5000"
;; "finished" "messages/count:84533"
;; "finished" "messages/delay-sum:4387559ms"
;; "finished" "messages/total-time:21s"
;; "finished" "messages/avg:51ms"
;; "finished" "messages/rate:4025msg/s"
;; nil
;; user=> (start-test-promesa 1500 5 5 20000)
;; "finished" "procs/count:15000"
;; "finished" "messages/count:82955"
;; "finished" "messages/delay-sum:83177574ms"
;; "finished" "messages/total-time:22s"
;; "finished" "messages/avg:1002ms"
;; "finished" "messages/rate:3770msg/s"
;; nil
;; user=> (start-test-promesa 10000 5 5 20000)
;; "finished" "procs/count:100000"
;; "finished" "messages/count:70153"
;; "finished" "messages/delay-sum:888977268ms"
;; "finished" "messages/total-time:37s"
;; "finished" "messages/avg:12671ms"
;; "finished" "messages/rate:1896msg/s"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment