Created
January 30, 2021 22:48
-
-
Save Frozenlock/e0c8b8f81838182fcf36ae14e7dfea5d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn -inheritable* | |
[var] | |
(let [*i (doto (proxy [InheritableThreadLocal] [] | |
;; This is where the magic happens. | |
;; childValue is evaluated in the parent thread where bindings should still valid. | |
(childValue [{:keys [bindings]}] | |
(let [new-bindings (merge bindings (get-thread-bindings))] | |
{:bindings new-bindings | |
:get-val (fn [] (with-bindings new-bindings | |
(deref var)))}))) | |
(.set {:get-val #(deref var)}))] | |
(reify clojure.lang.IDeref | |
(deref | |
[this] | |
;; If a binding exists, we are in a Clojure-compatible thread. | |
(if (contains? (get-thread-bindings) var) | |
{:value (deref var) | |
:source :bindings} | |
;; Otherwise try to use the inheritable variable. | |
(if-some [get-val (:get-val (.get *i))] | |
{:value (get-val) | |
:source :inheritable} | |
;; Lastly, default to the var initial value. | |
;; Even if we find ourselves in a 'lost' thread, we can still get the data stored into the atom. | |
{:value (deref var) | |
:source :default})))))) | |
;; Use an atom to easily change 'root' data. | |
(def ^:dynamic -*data (atom {})) | |
(defn set-data | |
[k v] | |
(swap! -*data assoc k v)) | |
;; ^ use above function to set init values. | |
;; Ex: | |
;; (set-data :dsn ...) | |
;; (set-data :release ...) | |
(def -*inheritable (-inheritable* #'-*data)) | |
;; Need an indirection layer to configure existing thread pools | |
(def -thread-local-inheritable | |
(ThreadLocal.)) | |
(defn -get-inheritable | |
[] | |
(if-let [tl-inheritable (.get -thread-local-inheritable)] | |
tl-inheritable | |
-*inheritable)) | |
(defn get-data | |
[] | |
(deref (:value (deref (-get-inheritable))))) | |
(defn -init-inheritable-in-clojure-thread-pool | |
[] | |
(let [max-count 100 | |
init (->> (partition 20 (pmap (fn [_] | |
(let [source (:source (deref (-get-inheritable)))] | |
(if (= source :default) | |
(do (.set -thread-local-inheritable (-inheritable* #'-*data)) | |
source) | |
source))) (range))) | |
(take-while #(some (fn [s] (= s :default)) %)) | |
(take max-count))] | |
(when (= (count init) 100) | |
(throw (ex-info "Couldn't initialize inheritable var"))))) | |
;; HACK: force existing threads in Clojure pool to define the inheritable variable. | |
(-init-inheritable-in-clojure-thread-pool) | |
;; Alternatively, here's a suggestion from Alex Miller: | |
;; You can set your own thread pool to use | |
;; http://clojure.github.io/clojure/clojure.core-api.html#clojure.core/set-agent-send-executor! | |
;; And the send-off equivalent (which future uses) | |
;; I would consider these hostile in a library, but ok in an app | |
;; This macro has a dual purpose: | |
;; 1. Insert new data into the bindings | |
;; 2. Use inherited values to place new bindings when coming from a non-clojure thread. | |
(defmacro with-data | |
[data & body] | |
`(let [data# (merge (get-data) ~data)] | |
(with-bindings {#'-*data | |
;; Atom not used here to avoid confusion (not resetable) | |
(reify clojure.lang.IDeref | |
(deref [_] | |
data#))} | |
~@body))) | |
;;; How it works | |
(comment | |
(future ; use default value or bindings | |
(future ; use default value or bindings | |
(thread ; use inheritable | |
(thread ; use inheritable | |
(with-data {} ; Value from inheritable is put back into bindings | |
(future)) ; value from bindings | |
(future))))) ; FALLBACK to initial value | |
) | |
;;; Examples | |
(comment | |
(with-data {:A 0} (pmap (fn [_] (with-data {:a 1} (get-data))) (range 10))) | |
(defmacro start-thread | |
[& body] | |
`(let [thread# (Thread. ^Runnable (fn [] ~@body))] | |
(.start thread#) | |
thread#)) | |
(with-data {:1 :1} | |
(future | |
(with-data {:2 :2} | |
(future | |
(start-thread | |
(with-data {:3 :3} | |
(println (get-data)) | |
(start-thread | |
(println (get-data)) | |
(start-thread | |
(println (get-data)))))))))) | |
(with-data {:z :z} | |
(future | |
(with-data {:a :a} | |
(future | |
(start-thread | |
(println (get-data)) | |
(start-thread | |
(println (get-data)) | |
(start-thread | |
(with-data {} | |
(future | |
(println (get-data))))))))))) | |
(import java.util.concurrent.Executors java.util.concurrent.ExecutorService) | |
(let [^ExecutorService executor (Executors/newFixedThreadPool 2) | |
p1 (promise) | |
p2 (promise)] | |
(with-data {:a 1} | |
(.submit executor ^Callable (fn [] | |
(deliver p1 (get-data)) | |
(with-data {:b 2} | |
(.submit executor ^Callable (fn [] | |
(deliver p2 (get-data))))))) | |
(let [result [@p1 @p2]] | |
(.shutdown executor) | |
result))) | |
(with-data {:a 1} | |
(future | |
(println "Future " (get-data)) | |
(with-data {:b 2} | |
(start-thread | |
(println "Thread " (get-data)) | |
(with-data {:c 3} | |
(start-thread | |
(println "Thread " (get-data)) | |
(future | |
(println "Last future " (get-data)))))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment