Skip to content

Instantly share code, notes, and snippets.

@camsaul
Last active March 7, 2019 03:00
Show Gist options
  • Save camsaul/f22d707c402b100e2250265a3ff9d4c5 to your computer and use it in GitHub Desktop.
Save camsaul/f22d707c402b100e2250265a3ff9d4c5 to your computer and use it in GitHub Desktop.
Test server & client for doing things like benchmarking Clojure HTTP servers or various endpoints. Tweak as needed
(ns metabase.server
(:require [ring.adapter.jetty :as ring-jetty])
(:import org.eclipse.jetty.server.Server
org.eclipse.jetty.server.handler.AbstractHandler))
(defn- ^AbstractHandler async-default [handler]
(#'ring-jetty/async-proxy-handler handler 0))
(defn- create-server
^Server [handler options]
(doto ^Server (#'ring-jetty/create-server options)
(.setHandler (async-default handler))))
(ns metabase.server-test-client
(:require [clj-http.client :as client]
[clojure.core.async :as async]
[metabase.util :as u]
[metabase.util.date :as du]))
(defn- threadsafe-println [& args]
(locking threadsafe-println
(apply println args)))
(defn- consume-responses [{{:keys [responses-chan error-chan finished-chan]} :chans, n :n, url :url}]
(let [start-time-ns (System/nanoTime)]
(async/go-loop [done 0]
(when (zero? (mod done 100))
(threadsafe-println (format "%s %d/%d elapsed: %s" (u/format-color 'blue url) done n (du/format-nanoseconds (- (System/nanoTime) start-time-ns)))))
(when-let [response (async/<! responses-chan)]
(if-not (= (:status response) 200)
;; if response is invalid, throw an error
(async/<! error-chan (ex-info "Unexpected response" response))
;; otherwise inc the done counter...
(let [done (inc done)]
;; and if it was the last run, we're done
(if (>= done n)
(async/>! finished-chan ::done)
;; if not, keep going
(recur done))))))))
(defn- consume-errors [{{:keys [error-chan finished-chan]} :chans, url :url, :as context}]
(async/go
(when-let [e (async/<! error-chan)]
(threadsafe-println "Caught error!" url e)
(async/>! finished-chan ::error))))
(def ^:private timeout-ms (* 5 60 1000))
(defn- block-until-finished [{{:keys [finished-chan]} :chans, :keys [n url]}]
(du/profile (format "%d requests to %s" n url)
(let [[result chan] (async/alts!!
[(async/timeout timeout-ms)
finished-chan])]
(when-not (= chan finished-chan)
(throw (Exception. (format "Timed out after %d milliseconds." timeout-ms))))
result)))
(defn- respond-fn [{{:keys [responses-chan]} :chans}]
(fn [response]
(async/put! responses-chan response)))
(defn- raise-fn [{{:keys [error-chan]} :chans}]
(fn [e]
(async/put! error-chan e)))
(defn- do-n-async-requests [{:keys [n url], :as context}]
(future
(try
(threadsafe-println (format "Making %d requests to %s..." n url))
(let [respond (respond-fn context)]
(dorun
(pmap
(fn [_]
(try
(respond (client/get url))
(catch Throwable e
((raise-fn context) e))))
(range n))))
(catch Throwable e
((raise-fn context) e)))))
(defn- profile-n-requests [endpoint port n]
(let [context {:chans {:responses-chan (async/chan (async/buffer 512))
:error-chan (async/promise-chan)
:finished-chan (async/promise-chan)}
:n n
:url (format "http://localhost:%d/%s" port endpoint)}]
(try
;; set up channel consumers
(consume-responses context)
(consume-errors context)
;; launch a TON of API requests (async)
(do-n-async-requests context)
;; block waiting for the done message
(block-until-finished context)
;; now clean up the context
(finally
(doseq [c (vals (:chans context))]
(async/close! c))))))
(defn- profile-n-requests-for-all-endpoints [port n]
(let [max-active-threads (atom (Thread/activeCount))
futures (for [endpoint ["fast"
"big_response"
"big_slow_response"
"slow_sync"
"slow_async"
"fast_db_access"
"slow_db_access"]]
(future
(profile-n-requests endpoint port n)))]
(try
(du/profile (format "%d requests (%d endpoints)" (* n (count futures)) (count futures))
(doseq [futur futures]
(swap! max-active-threads (fn [current-max]
(let [current-threads (Thread/activeCount)]
(max current-max current-threads))))
(deref futur timeout-ms ::timed-out))
(println "Max # of active threads :" @max-active-threads))
(finally
(doseq [futur futures]
(future-cancel futur))))))
(ns metabase.server-test
(:require [clojure.core.async :as async]
[clojure.java
[io :as io]
[jdbc :as jdbc]]
[compojure.core :as compojure :refer [GET]]
[metabase.server :as server]
[toucan.db :as db])
(:import org.eclipse.jetty.server.Server))
;;; +----------------------------------------------------------------------------------------------------------------+
;;; | Server |
;;; +----------------------------------------------------------------------------------------------------------------+
(defn- threadsafe-println [& args]
(locking threadsafe-println
(apply println args)))
(defonce port-counter (atom 3500))
(defonce ^:private server (atom nil))
(def requests-counter (atom 0))
(def active-requests (atom 0))
(defn- set-server! [^Server new-server]
(when-not (= new-server @server)
(let [[^Server old-server, ^Server actual-new-server] (reset-vals! server new-server)]
(when old-server
(println "stopping server" old-server)
(.stop old-server))
(when (= actual-new-server new-server)
(println "starting server" new-server)
(reset! requests-counter 0)
(reset! active-requests 0)
(.start new-server))
actual-new-server)))
(defn log-requests-middleware [handler]
(fn [request respond raise]
(swap! active-requests inc)
(let [total-requests (swap! requests-counter inc)]
(when (zero? (mod total-requests 100))
(threadsafe-println
(format "Total requests: %d; Active requests: %d; Active threads: %d"
total-requests @active-requests (Thread/activeCount)))))
(handler
request
(fn [response]
(swap! active-requests dec)
(respond response))
raise)))
(defonce ^java.util.concurrent.ThreadPoolExecutor async-sync-pool
(java.util.concurrent.Executors/newFixedThreadPool 15))
(defn async [f & args]
(let [c (async/chan 1)]
(.submit async-sync-pool ^Runnable (fn []
(try
(async/put! c (apply f args))
;; TODO - not sure if this is ideal
(catch Throwable e
(async/put! c e))
(finally
(async/close! c)))))
c))
(def db-access-chan (async/chan (async/buffer 15)))
(dotimes [_ 15]
(async/>!! db-access-chan ::db-access))
(defn- do-with-async-db-access [query-fn result-fn]
(let [f (fn [token]
(let [result (try
(query-fn)
(finally
(async/put! db-access-chan token)))]
(result-fn result)))]
(async/go
(when-let [token (async/<! db-access-chan)]
(f token))))
nil)
(defmacro with-async-db-access [[result-binding db-query-form] & body]
`(do-with-async-db-access
(fn [] ~db-query-form)
(fn [~result-binding]
~@body)))
(def handler
(-> (compojure/routes
(GET "/fast" []
(fn [_ respond _]
(respond
{:status 200, :body "OK"})))
(GET "/big_response" []
(fn [_ respond _]
(respond (io/file "/Users/cam/Downloads/build_43102_step_105_container_0.txt"))))
(GET "/big_slow_response" []
(fn [_ respond _]
(respond {:status 200
:body (slurp (io/file "/Users/cam/Downloads/build_43102_step_105_container_0.txt"))})))
(GET "/slow_sync" []
(fn [_ respond _]
(Thread/sleep 1000)
(respond
{:status 200, :body "OK"})))
(GET "/slow_async" []
(fn [_ respond _]
(async/go
(async/<! (async/timeout 1000))
(respond
{:status 200, :body "OK"}))))
;; threadpool + helper tokens ?????
(GET "/fast_db_access" []
(fn [_ respond _]
(async/go
(let [_ (async/<! (async jdbc/query (db/connection) "SELECT 1;"))]
(respond {:status 200, :body "OK"})))))
;; threadpool + helper tokens
(GET "/slow_db_access" []
(fn [_ respond _]
(async/go
(let [result (async/<! (async jdbc/query (db/connection) "SELECT pg_sleep(1);"))]
(respond {:status 200, :body "OK"}))))))
log-requests-middleware))
(defn- start-server!
"(Re)start a server."
[]
(metabase.db/setup-db-if-needed!)
(let [port (swap! port-counter inc)
server (#'server/create-server handler (assoc (#'server/jetty-config) :port port))]
(set-server! server)
(println (format "Started server on port %d." port))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment