Last active
June 16, 2021 11:41
-
-
Save DeLaGuardo/808353f70475d96cdd4a484e65da0a0e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns user | |
(:require [clojure.core.async :as async])) | |
(defn now-millis [] | |
(System/currentTimeMillis)) | |
(defn throttled-chan | |
"Returns a channel that will throttle attempts to take items according to the rate-limits. | |
rate-limits should be a sequence of pairs of integers. The first item in each pair | |
is describing how many items is possible to take, at most, from the input for the interval | |
defined as second item in pair in milliseconds. | |
Output channel will close if either input channel is drained/closed or rate-limits are empty. | |
The output channle is unbuffered by default, unless buf-or-n is given." | |
([rate-limits ch] (throttled-chan rate-limits ch nil)) | |
([rate-limits ch buf-or-n] | |
(let [out-ch (async/chan buf-or-n)] | |
(async/go | |
(loop [now (now-millis) | |
[[rate interval] & rate-limits :as rls] rate-limits | |
timeout interval] | |
(if (seq rls) | |
(let [t-ch (async/timeout timeout) | |
ctrl-ch (async/go | |
(loop [x 0] | |
(if (< x rate) | |
(let [[value port] (async/alts! [t-ch ch])] | |
(condp = port | |
t-ch ::timeout | |
ch (when (some? value) | |
(async/>! out-ch value) | |
(recur (inc x))))) | |
(do (async/<! t-ch) | |
::timeout)))) | |
ctrl (async/<! ctrl-ch) | |
now' (now-millis)] | |
(if (= ::timeout ctrl) | |
(recur now' rate-limits (- (second (first rate-limits)) (- now' (+ now timeout)))) | |
(async/close! out-ch))) | |
(async/close! out-ch)))) | |
out-ch))) | |
(comment | |
(def current-rate-limit | |
"one request per second" | |
(volatile! [1 1000])) | |
(defn rate-limits [] | |
(lazy-seq | |
(cons @current-rate-limit (rate-limits)))) | |
(let [in-ch (async/chan) | |
out-ch (throttled-chan (rate-limits) in-ch)] | |
(async/go-loop [counter 0] | |
(when-let [v (async/<! out-ch)] | |
(prn (format "%02d %s" v (now-millis))) | |
(when (= counter 10) | |
;; Increase rate-limit - now it should be 10 rps | |
(vreset! current-rate-limit [10 1000])) | |
(recur (inc counter)))) | |
(async/go-loop [[item & items] (range 50)] | |
(if (some? item) | |
(do (async/>! in-ch item) | |
(recur items)) | |
(async/close! in-ch)))) | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment