Last active
March 7, 2019 03:00
-
-
Save camsaul/f22d707c402b100e2250265a3ff9d4c5 to your computer and use it in GitHub Desktop.
Test server & client for doing things like benchmarking Clojure HTTP servers or various endpoints. Tweak as needed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns metabase.server | |
(:require [ring.adapter.jetty :as ring-jetty]) | |
(:import org.eclipse.jetty.server.Server | |
org.eclipse.jetty.server.handler.AbstractHandler)) | |
(defn- ^AbstractHandler async-default [handler] | |
(#'ring-jetty/async-proxy-handler handler 0)) | |
(defn- create-server | |
^Server [handler options] | |
(doto ^Server (#'ring-jetty/create-server options) | |
(.setHandler (async-default handler)))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns metabase.server-test-client | |
(:require [clj-http.client :as client] | |
[clojure.core.async :as async] | |
[metabase.util :as u] | |
[metabase.util.date :as du])) | |
(defn- threadsafe-println [& args] | |
(locking threadsafe-println | |
(apply println args))) | |
(defn- consume-responses [{{:keys [responses-chan error-chan finished-chan]} :chans, n :n, url :url}] | |
(let [start-time-ns (System/nanoTime)] | |
(async/go-loop [done 0] | |
(when (zero? (mod done 100)) | |
(threadsafe-println (format "%s %d/%d elapsed: %s" (u/format-color 'blue url) done n (du/format-nanoseconds (- (System/nanoTime) start-time-ns))))) | |
(when-let [response (async/<! responses-chan)] | |
(if-not (= (:status response) 200) | |
;; if response is invalid, throw an error | |
(async/<! error-chan (ex-info "Unexpected response" response)) | |
;; otherwise inc the done counter... | |
(let [done (inc done)] | |
;; and if it was the last run, we're done | |
(if (>= done n) | |
(async/>! finished-chan ::done) | |
;; if not, keep going | |
(recur done)))))))) | |
(defn- consume-errors [{{:keys [error-chan finished-chan]} :chans, url :url, :as context}] | |
(async/go | |
(when-let [e (async/<! error-chan)] | |
(threadsafe-println "Caught error!" url e) | |
(async/>! finished-chan ::error)))) | |
(def ^:private timeout-ms (* 5 60 1000)) | |
(defn- block-until-finished [{{:keys [finished-chan]} :chans, :keys [n url]}] | |
(du/profile (format "%d requests to %s" n url) | |
(let [[result chan] (async/alts!! | |
[(async/timeout timeout-ms) | |
finished-chan])] | |
(when-not (= chan finished-chan) | |
(throw (Exception. (format "Timed out after %d milliseconds." timeout-ms)))) | |
result))) | |
(defn- respond-fn [{{:keys [responses-chan]} :chans}] | |
(fn [response] | |
(async/put! responses-chan response))) | |
(defn- raise-fn [{{:keys [error-chan]} :chans}] | |
(fn [e] | |
(async/put! error-chan e))) | |
(defn- do-n-async-requests [{:keys [n url], :as context}] | |
(future | |
(try | |
(threadsafe-println (format "Making %d requests to %s..." n url)) | |
(let [respond (respond-fn context)] | |
(dorun | |
(pmap | |
(fn [_] | |
(try | |
(respond (client/get url)) | |
(catch Throwable e | |
((raise-fn context) e)))) | |
(range n)))) | |
(catch Throwable e | |
((raise-fn context) e))))) | |
(defn- profile-n-requests [endpoint port n] | |
(let [context {:chans {:responses-chan (async/chan (async/buffer 512)) | |
:error-chan (async/promise-chan) | |
:finished-chan (async/promise-chan)} | |
:n n | |
:url (format "http://localhost:%d/%s" port endpoint)}] | |
(try | |
;; set up channel consumers | |
(consume-responses context) | |
(consume-errors context) | |
;; launch a TON of API requests (async) | |
(do-n-async-requests context) | |
;; block waiting for the done message | |
(block-until-finished context) | |
;; now clean up the context | |
(finally | |
(doseq [c (vals (:chans context))] | |
(async/close! c)))))) | |
(defn- profile-n-requests-for-all-endpoints [port n] | |
(let [max-active-threads (atom (Thread/activeCount)) | |
futures (for [endpoint ["fast" | |
"big_response" | |
"big_slow_response" | |
"slow_sync" | |
"slow_async" | |
"fast_db_access" | |
"slow_db_access"]] | |
(future | |
(profile-n-requests endpoint port n)))] | |
(try | |
(du/profile (format "%d requests (%d endpoints)" (* n (count futures)) (count futures)) | |
(doseq [futur futures] | |
(swap! max-active-threads (fn [current-max] | |
(let [current-threads (Thread/activeCount)] | |
(max current-max current-threads)))) | |
(deref futur timeout-ms ::timed-out)) | |
(println "Max # of active threads :" @max-active-threads)) | |
(finally | |
(doseq [futur futures] | |
(future-cancel futur)))))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns metabase.server-test | |
(:require [clojure.core.async :as async] | |
[clojure.java | |
[io :as io] | |
[jdbc :as jdbc]] | |
[compojure.core :as compojure :refer [GET]] | |
[metabase.server :as server] | |
[toucan.db :as db]) | |
(:import org.eclipse.jetty.server.Server)) | |
;;; +----------------------------------------------------------------------------------------------------------------+ | |
;;; | Server | | |
;;; +----------------------------------------------------------------------------------------------------------------+ | |
(defn- threadsafe-println [& args] | |
(locking threadsafe-println | |
(apply println args))) | |
(defonce port-counter (atom 3500)) | |
(defonce ^:private server (atom nil)) | |
(def requests-counter (atom 0)) | |
(def active-requests (atom 0)) | |
(defn- set-server! [^Server new-server] | |
(when-not (= new-server @server) | |
(let [[^Server old-server, ^Server actual-new-server] (reset-vals! server new-server)] | |
(when old-server | |
(println "stopping server" old-server) | |
(.stop old-server)) | |
(when (= actual-new-server new-server) | |
(println "starting server" new-server) | |
(reset! requests-counter 0) | |
(reset! active-requests 0) | |
(.start new-server)) | |
actual-new-server))) | |
(defn log-requests-middleware [handler] | |
(fn [request respond raise] | |
(swap! active-requests inc) | |
(let [total-requests (swap! requests-counter inc)] | |
(when (zero? (mod total-requests 100)) | |
(threadsafe-println | |
(format "Total requests: %d; Active requests: %d; Active threads: %d" | |
total-requests @active-requests (Thread/activeCount))))) | |
(handler | |
request | |
(fn [response] | |
(swap! active-requests dec) | |
(respond response)) | |
raise))) | |
(defonce ^java.util.concurrent.ThreadPoolExecutor async-sync-pool | |
(java.util.concurrent.Executors/newFixedThreadPool 15)) | |
(defn async [f & args] | |
(let [c (async/chan 1)] | |
(.submit async-sync-pool ^Runnable (fn [] | |
(try | |
(async/put! c (apply f args)) | |
;; TODO - not sure if this is ideal | |
(catch Throwable e | |
(async/put! c e)) | |
(finally | |
(async/close! c))))) | |
c)) | |
(def db-access-chan (async/chan (async/buffer 15))) | |
(dotimes [_ 15] | |
(async/>!! db-access-chan ::db-access)) | |
(defn- do-with-async-db-access [query-fn result-fn] | |
(let [f (fn [token] | |
(let [result (try | |
(query-fn) | |
(finally | |
(async/put! db-access-chan token)))] | |
(result-fn result)))] | |
(async/go | |
(when-let [token (async/<! db-access-chan)] | |
(f token)))) | |
nil) | |
(defmacro with-async-db-access [[result-binding db-query-form] & body] | |
`(do-with-async-db-access | |
(fn [] ~db-query-form) | |
(fn [~result-binding] | |
~@body))) | |
(def handler | |
(-> (compojure/routes | |
(GET "/fast" [] | |
(fn [_ respond _] | |
(respond | |
{:status 200, :body "OK"}))) | |
(GET "/big_response" [] | |
(fn [_ respond _] | |
(respond (io/file "/Users/cam/Downloads/build_43102_step_105_container_0.txt")))) | |
(GET "/big_slow_response" [] | |
(fn [_ respond _] | |
(respond {:status 200 | |
:body (slurp (io/file "/Users/cam/Downloads/build_43102_step_105_container_0.txt"))}))) | |
(GET "/slow_sync" [] | |
(fn [_ respond _] | |
(Thread/sleep 1000) | |
(respond | |
{:status 200, :body "OK"}))) | |
(GET "/slow_async" [] | |
(fn [_ respond _] | |
(async/go | |
(async/<! (async/timeout 1000)) | |
(respond | |
{:status 200, :body "OK"})))) | |
;; threadpool + helper tokens ????? | |
(GET "/fast_db_access" [] | |
(fn [_ respond _] | |
(async/go | |
(let [_ (async/<! (async jdbc/query (db/connection) "SELECT 1;"))] | |
(respond {:status 200, :body "OK"}))))) | |
;; threadpool + helper tokens | |
(GET "/slow_db_access" [] | |
(fn [_ respond _] | |
(async/go | |
(let [result (async/<! (async jdbc/query (db/connection) "SELECT pg_sleep(1);"))] | |
(respond {:status 200, :body "OK"})))))) | |
log-requests-middleware)) | |
(defn- start-server! | |
"(Re)start a server." | |
[] | |
(metabase.db/setup-db-if-needed!) | |
(let [port (swap! port-counter inc) | |
server (#'server/create-server handler (assoc (#'server/jetty-config) :port port))] | |
(set-server! server) | |
(println (format "Started server on port %d." port)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment