Created
February 3, 2017 16:53
-
-
Save aengelberg/0336196eff53550c5fb17a8ba5b535f7 to your computer and use it in GitHub Desktop.
Manifold examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns hello-manifold.core | |
(:require | |
[aleph.http :as http] | |
[byte-streams :as bs] | |
[cheshire.core :as json] | |
[clojure.string :as str] | |
[manifold.deferred :as d] | |
[manifold.executor :as e] | |
[manifold.stream :as s])) | |
;;; ROADMAP | |
;; - | |
;; Utils (ignore me plz) | |
(defn where-am-i? | |
"Returns the friendly name of the current java thread." | |
[] | |
(.getName (Thread/currentThread))) | |
(let [safe-printer (agent nil)] | |
(defn log | |
"Does a println but also prints the current thread." | |
[& args] | |
(let [thread (where-am-i?)] | |
(send safe-printer | |
(fn [_] | |
(println (str/join " " (list* | |
(str "[" (mod (System/currentTimeMillis) 100000) "]") | |
(str "[" thread "]\t") | |
args))))) | |
nil))) | |
(defn inc* | |
"A fake expensive computation. Sleeps for 1 second, then increments input." | |
[x] | |
(log "inc'ing" x) | |
(Thread/sleep 1000) | |
(let [result (inc x)] | |
(log "=>" result) | |
result)) | |
(comment | |
;;; PROMISES | |
;; A promise is a box that can be "realized" or not. API: | |
;; - (promise) - creates a promise | |
;; - (realized? p) - has it has been delivered? | |
;; - (deliver p value) - delivers a value, or does nothing if a value already exists | |
;; - (deref p) or @p - blocks the current thread until delivered, then returns that value | |
;;; FUTURES | |
;; A future is an asynchronous computation that can be manipulated like a promise. API: | |
;; - (future ...) - creates a future | |
;; - (realized? f) - has the computation completed? | |
;; - (deref p) or @p - blocks until the computation completes | |
;;; DEFERREDS | |
;; A deferred is a box with two slots: "success" and "error" state. Core API: | |
;; - (d/deferred) - creates a deferred | |
;; - (realized? d) - has it been realized with either error or success? | |
;; - (d/success! d val) - delivers a success value | |
;; - (d/error! d val) - delivers an error value | |
;; - (deref d) - blocks until success value (and returns it) or error value (and throws it) | |
;; - (d/on-realized d success-callback error-callback) - sets callbacks (super useful!) | |
;;; DEFERRED FUTURES | |
;; A shortcut to creating an async computation that dumps into a deferred. | |
;; - (d/future ...) | |
;; - Automatic error handling | |
;; - Runs on an unbounded thread pool | |
;; How to chain computations together? | |
;; Naive way (the only way you can do it with futures) | |
(as-> (future 1) f | |
(future (inc* @f)) | |
(future (inc* @f)) | |
(future (inc* @f)) | |
(future (inc* @f)) | |
(log "=>" @f)) | |
;; deferred's true callbacks unlock the more powerful `d/chain` | |
(as-> (d/future (inc* 1)) f | |
(d/chain f inc* inc* inc* inc*) | |
(log "=>" @f)) | |
(comment ;; Diagram for the above `d/chain` call: | |
D1 = 1 = << 1 >> | |
D2 = (inc D1) = << 2 >> | |
D3 = (inc D2) = << 3 >> | |
D4 = (inc D3) = << 4 >> | |
D5 = (inc D4) = << 5 >>) | |
;; return values to functions in `d/chain` may have as many additional | |
;; layers of deferredness as you want, which are automatically | |
;; swallowed into the overall deferred | |
(as-> (d/future (inc* 1)) f | |
(d/chain f (fn [x] (d/future (d/future (d/future (inc* x)))))) | |
(log "=>" @f)) | |
;; `d/chain` also magically treats non-deferred as deferred | |
(d/chain 1 inc*) | |
(comment | |
;; equivalent to | |
(d/chain (d/success-deferred 1) inc*) | |
) | |
;; What happens when an error is thrown in the middle of the chain? | |
(as-> (d/future 1) f | |
(d/chain f inc* inc* #(/ % 0) inc* inc*) | |
(log "=>" @f)) | |
(comment | |
D1 = 1 = << 1 >> | |
D2 = (inc D1) = << 2 >> | |
D3 = (inc D2) = << 3 >> | |
D4 = (/ D3 0) = << ERROR division by zero >> | |
D5 = (inc D4) = << ERROR division by zero >> | |
D6 = (inc D5) = << ERROR division by zero >>) | |
;; Introducing `d/catch` | |
(as-> (d/future (Thread/sleep 100) 1) f | |
(d/chain f inc* inc* (fn [x] (/ x 0)) inc* inc*) | |
(d/catch f Exception (fn [e] | |
(log "Got exception" e) | |
:error)) | |
(log "=>" @f)) | |
(comment ;; Diagram for the above `d/chain` call: | |
D1 = 1 = << 1 , nil >> | |
D2 = (inc D1) = << 2 , nil >> | |
D3 = (inc D2) = << 3 , nil >> | |
D4 = (/ D3 0) = << nil , ERROR division by zero >> | |
D5 = (inc D4) = << nil , ERROR division by zero >> | |
D6 = (inc D5) = << nil , ERROR division by zero >> | |
D7 = (try D6 | |
(catch Exception e | |
(log "Got exception" e) | |
:error)) | |
= << :error , nil >>) | |
;; Introducing `d/zip` | |
(let [d (d/zip | |
(d/chain (d/onto (d/future (Thread/sleep 200) 0) fixed) inc* inc*) | |
(d/chain (d/onto (d/future (Thread/sleep 200) 10) fixed) inc* inc*) | |
(d/chain (d/onto (d/future (Thread/sleep 200) 100) fixed) inc* inc*))] | |
(log @d)) | |
(comment | |
D1 = 0 = << 0 >> | |
D2 = (inc D1) = << 1 >> | |
D3 = (inc D2) = << 2 >> | |
D4 = 10 = << 10 >> | |
D5 = (inc D4) = << 11 >> | |
D6 = (inc D5) = << 12 >> | |
D7 = 100 = << 100 >> | |
D8 = (inc D7) = << 101 >> | |
D9 = (inc D8) = << 102 >> | |
D10 = [D3 D6 D9] = << (2 12 112) >> | |
;; Network | |
D1 → D2 → D3 ↘ | |
D4 → D5 → D6 → D10 | |
D7 → D8 → D9 ↗ | |
) | |
) | |
;; More concrete example | |
(def base-url "http://jsonplaceholder.typicode.com") | |
(defn decode-body | |
[resp] | |
(-> resp | |
:body | |
slurp | |
(json/parse-string true))) | |
;; Not using manifold | |
(defn fetch | |
[relative-path] | |
(log "Fetching" relative-path) | |
(decode-body @(http/get (str base-url relative-path)))) | |
(defn posts-for-user-expanded | |
[user-id] | |
(->> (fetch (str "/posts?userId=" user-id)) | |
(mapv (fn [{:keys [id] :as post}] | |
(assoc post :comments | |
(->> (fetch (str "/posts/" id "/comments")) | |
(mapv (juxt :email :body)))))))) | |
(defn all-users-expanded | |
[] | |
(->> (fetch "/users") | |
(mapv (fn [{:keys [id] :as user}] | |
(assoc user :posts | |
(posts-for-user-expanded id)))))) | |
(comment | |
(time (puget.printer/pprint | |
(all-users-expanded)))) | |
#_(def web-coordinator (e/fixed-thread-executor 2)) | |
;; Using manifold and parallelism | |
(defn fetch-async | |
[relative-path] | |
(d/chain | |
(http/get (str base-url relative-path)) | |
(fn [result] (log "Fetched" relative-path) result) | |
decode-body)) | |
(defn posts-for-user-expanded-async | |
[user-id] | |
(d/chain | |
(fetch-async (str "/posts?userId=" user-id)) | |
(fn [posts] | |
(apply d/zip | |
(map (fn [{:keys [id] :as post}] | |
(d/chain | |
(fetch-async (str "/posts/" id "/comments")) | |
(partial mapv (juxt :email :body)) | |
(partial assoc post :comments))) | |
posts))))) | |
(defn all-users-expanded-async | |
[] | |
(d/chain | |
(fetch-async "/users") | |
(fn [users] | |
(apply d/zip | |
(map (fn [{:keys [id] :as user}] | |
(d/chain | |
(posts-for-user-expanded-async id) | |
(partial assoc user :posts))) | |
users))))) | |
;;; LET-FLOW | |
(comment | |
(d/let-flow [users (fetch-async "/users") | |
user (first users) | |
user-id (:id user) | |
posts (fetch-async (str "/posts?userId=" user-id)) | |
post (first posts) | |
post-id (:id post) | |
comments (fetch-async (str "/posts/" post-id "/comments"))] | |
{:user user | |
:post post | |
:comments comments}) | |
;; equivalent to | |
) | |
(defn stats-callback | |
[stats] | |
(prn stats)) | |
(defn stats-callback2 | |
[stats]#_ | |
(prn "stats" stats)) | |
(comment | |
(e/register-execute-pool-stats-callback #'stats-callback2) | |
(def fixed (e/fixed-thread-executor 5 {:stats-callback #'stats-callback}))) | |
;;; TAKEAWAYS | |
;; - Most `d/*` functions, unless explicitly specified, do not create new threads, | |
;; they just pile work onto existing threads. | |
;; - So make sure you understand what thread something will run on! | |
;; - But when explicitly specified (`d/future`, `d/onto`, etc), you get control over | |
;; those threads / threadpools. | |
;; - Doesn't hurt to wrap stuff in `d/future` when in doubt |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment