Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save BadUncleX/b1057d5730fa507ab4627dbea96c618b to your computer and use it in GitHub Desktop.
Save BadUncleX/b1057d5730fa507ab4627dbea96c618b to your computer and use it in GitHub Desktop.
core.async alts! 选取最快的服务

alts! vs alt!

alts! is a function that accepts a vector of channels to take from and/or channels with values to be put on them (in the form of doubleton vectors: [c v]). The vector may be dynamically constructed; the code calling alts! may not know how many channels it'll be choosing among (and indeed that number need not be constant across invocations).

alt! is a convenience macro which basically acts as a cross between cond and alts!. Here the number of "ports" (channels or channel+value pairs) must be known statically, but in practice this is quite often the case and the cond-like syntax is very clear.

alt! expands to a somewhat elaborate expression using alts!; apart from the syntactic convenience, it offers no extra functionality.

小结: alt! 的参数是确定的 (偶数个) alts! 的参数是不确定的 (可以put!)

alts!! 的底层用到了deliver promise:

(defn alts!!
  "Like alts!, except takes will be made as if by <!!, and puts will
  be made as if by >!!, will block until completed, and not intended
  for use in (go ...) blocks."
  [ports & {:as opts}]
  (let [p (promise)
        ret (do-alts (partial deliver p) ports opts)]
    (if ret
      @ret
      (deref p))))
;;
;; upload multiple image to server
;; first come first served
;;
(ns core_async_alts_fastest.1_alts_upload_image
(:require [clojure.core.async :as async :refer [<!! >!! <! >! go onto-chan close! timeout chan alts! alts!! alt!!]]
))
(defn upload-image
[headshot c]
;; go 语句放在upload method里, 我习惯在外部自己写go调用方法
(go (Thread/sleep (rand 100))
;; 向channel 写入, 类似于deliver
(>! c headshot)))
(comment
(let [c1 (chan)
c2 (chan)
c3 (chan)]
;;
(upload-image "serious.jpg" c1)
(upload-image "fun.jpg" c2)
(upload-image "sassy.jpg" c3)
;; alts! 返回的的内容是一个vector,分别是channel的内容 和 channel本身
(let [[headshot channel] (alts!! [c1 c2 c3])]
(println "Sending headshot notification for" headshot))
)
)
;; go while 版本, 实际上alts!! 是可以取出全部的, 不一定是只返回一个.
(comment
(let [c1 (chan)
c2 (chan)
c3 (chan)]
;;
(upload-image "serious.jpg" c1)
(upload-image "fun.jpg" c2)
(upload-image "sassy.jpg" c3)
;; alts! 返回的的内容是一个vector,分别是channel的内容 和 channel本身
(go (while true (let [[headshot channel] (alts!! [c1 c2 c3])]
(println "Sending headshot notification for" headshot))))
)
)
(ns core-async-alts-fastest.2-alts-google-fastest
(:require [clojure.core.async :as async :refer [<!! >!! <! >! go onto-chan close! timeout chan alt! alts! alt!!]]
))
(defn fake-search [kind]
;; 这里的两个参数外部何时调用?
(fn [c query]
(go
(<! (timeout (rand-int 100)))
(>! c [kind query]))))
;; 三个服务都有双备份,谁快用谁
(def web1 (fake-search :web1))
(def web2 (fake-search :web2))
(def image1 (fake-search :image1))
(def image2 (fake-search :image2))
(def video1 (fake-search :video1))
(def video2 (fake-search :video2))
;; 两个服务中选择最快的那个
;; (fastest "clojure" web1 web2)
(defn fastest [query & replicas]
(let [c (chan)]
(doseq [replica replicas]
;; (fake-search :web1 c "clojure")
(replica c query))
;; 返回内容 [:web1 "clojure"] OR [:web2 "clojure"]
;; 获取c, 谁先往channel写入成功返回谁
c))
;; google search
;; 查询3个服务,每个服务都有2个备份
(defn google [query]
(let [c (chan)
t (timeout 80)]
;; 这里向c 写入3次不同的服务; 每个fastest只有一个channel, 两个中只有一个写成功
(go (>! c (<! (fastest query web1 web2))))
(go (>! c (<! (fastest query image1 image2))))
(go (>! c (<! (fastest query video1 video2))))
(go (loop [i 0 ret []]
(if (= i 3)
ret
;; 这里 v 是什么?
;; @problems
(recur (inc i) (conj ret
;; alt 需要确保后面的参数是偶数, 但是我不知道这里的v有什么用
;; 我觉得还是alts 容易理解
#_(alts! [c t])
;; @problems
;; 下面括号干啥?
(alt! [c t] ([v] v)) ;; what's the fuck of v?
)))))))
(comment
(<!! (google "clojure"))
)
;; from netty socket
;; 下面这个和google上面的有点类似, 即alt! 后面是偶数
;;
(defn- write-loop
"Returns a channel containing the result of a loop that sends
messages over the WebSocket when items are placed on the
write-channel.
If an error occurs or the remote endpoint is closed, the exception
or status code and reason are given as the result. When one of
these events occurs, it closes the write channel so that no new
messages may be attempted. Values already on the write channel that
have yet to be sent to the client will remain on the write channel
so that the user can decide what to do with them.
When anything would stop writing, the read-channel is also closed.
If the user closes the write-channel, the session will be closed
with it.
Sessions will be closed with status code 1000 (CLOSE_NORMAL)."
[^Session session read-channel write-channel result-channel]
(let [remote (.getRemote session)]
(go-loop []
;; here
(alt!
result-channel
;; 这边括号干啥?
;; @problems
([v]
;; The session is already closed, so just deal with the
;; read and write channels.
(close! write-channel)
(close! read-channel)
v)
write-channel
([message]
(if (nil? message)
(do (.close session) (close! read-channel))
(do
;; Errors inside of the sendString call are passed to
;; onWebSocketError so you don't need to capture them
;; here
(.sendString remote message)
(recur))))
:priority true))))
(ns core-async-alts-fastest.3-alts-walkthrough)
(require '[clojure.core.async :as async :refer :all])
;;;; ALTS
;; One killer feature for channels over queues is the ability to wait
;; on many channels at the same time (like a socket select). This is
;; done with `alts!!` (ordinary threads) or `alts!` in go blocks.
;; We can create a background thread with alts that combines inputs on
;; either of two channels. `alts!!` takes either a set of operations
;; to perform - either a channel to take from a [channel value] to put
;; and returns the value (nil for put) and channel that succeeded:
(let [c1 (chan)
c2 (chan)]
(thread (while true
(let [[v ch] (alts!! [c1 c2])]
(println "Read" v "from" ch))))
;; @problems
;; alts! 应该是返回c1 c2中的一个, 怎么两个都返回了?
(>!! c1 "hi")
(>!! c2 "there"))
;; Prints (on stdout, possibly not visible at your repl):
;; Read hi from #<ManyToManyChannel ...>
;; Read there from #<ManyToManyChannel ...>
;; We can use alts! to do the same thing with go blocks:
(let [c1 (chan)
c2 (chan)]
(go (while true
(let [[v ch] (alts! [c1 c2])]
(println "Read" v "from" ch))))
(go (>! c1 "hi"))
(go (>! c2 "there")))
;; Since go blocks are lightweight processes not bound to threads, we
;; can have LOTS of them! Here we create 1000 go blocks that say hi on
;; 1000 channels. We use alts!! to read them as they're ready.
(let [n 1000
cs (repeatedly n chan)
begin (System/currentTimeMillis)]
(doseq [c cs] (go (>! c "hi")))
(dotimes [i n]
(let [[v c] (alts!! cs)]
(assert (= "hi" v))))
(println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
;; We can combine timeout with `alts!` to do timed channel waits.
;; Here we wait for 100 ms for a value to arrive on the channel, then
;; give up:
(let [c (chan)
begin (System/currentTimeMillis)]
(alts!! [c (timeout 100)])
(println "Gave up after" (- (System/currentTimeMillis) begin)))
(ns core-async-alts-fastest.4-alts-debounce)
;; debounce
;; from here: https://zhuanlan.zhihu.com/p/23497216
;;
;; 其中 in 和 out 是两个 Channel, 代码实现的功能是从 in 读取的数据,
;; 按照事件做 debounce 丢弃掉一部分数据, 保证过于频繁发送的数据只会去最后一个
(defn debounce [in ms]
(let [out (chan)]
(go-loop [last-val nil]
(let [val (if (nil? last-val) (<! in) last-val)
timer (timeout ms)
[new-val ch] (alts! [in timer])]
(condp = ch
timer (do (>! out val) (recur nil))
in (recur new-val))))
out))
(ns core-async-alts-fastest.5-alts-timeout-put-operation
(:require [clojure.core.async :as async :refer [<!! >!! <! >! go onto-chan close! timeout chan alts! alts!! alt!!]]
))
;;
;; from braveclojure: https://www.braveclojure.com/core-async/
;;
;; You can also use alts!! to specify put operations.
;; To do that, place a vector inside the vector you pass to alts!!,
;; like at ➊ in this example:
;; 在 alts!! 后面也可以加put操作, 当然顺序是
(comment
(let [c1 (chan)
c2 (chan)]
(go (<! c2))
(let [[value channel] (alts!! [c1 [c2 "put!"]])]
(println value)
(= channel c2)))
)
; => true
; => true
;;
;; timeout
;; 设置自动超时时间, 放在alts! 里和其他channel一起
;;
(defn upload
[headshot c]
(go (Thread/sleep (rand 100))
(>! c headshot)))
(comment
(let [c1 (chan)]
(upload "serious.jpg" c1)
(let [[headshot channel] (alts!! [c1 (timeout 20)])]
(if headshot
(println "Sending headshot notification for" headshot)
(println "Timed out!"))))
)
; => Timed out!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment