Last active
November 21, 2019 20:09
-
-
Save ichramm/0348f2cd068b9e3092fab0686ceae1d8 to your computer and use it in GitHub Desktop.
stream paginated
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;;;; | |
;; Client | |
;;;; | |
(defn stream-seq-response-with-more! | |
"Returns a lazy sequence that wraps the streaming of data from the server" | |
[conn results have-more?] | |
(lazy-seq | |
(if (empty? results) | |
(if have-more? | |
(when-let [result (send-request-get-response-sync! conn {:method :more})] | |
(cons (first (:data result)) | |
(stream-seq-response! conn | |
(rest (:data result)) | |
(:have-more? result)))) | |
(return-connection conn)) | |
(cons (first results) | |
(stream-seq-response! conn | |
(rest results) | |
have-more?))))) | |
;;;; | |
;; Server | |
;;;; | |
; TODO: Remove this if the other approach passes stress test (which seems to be the case, btw) | |
(defn stream-seq-response-with-more | |
"Helper function that streams potentially long sequences to the client" | |
([socket results] | |
(stream-seq-response-with-more socket results 100 10000)) | |
([socket results batch-size read-timeout] | |
(deferred/loop [remaining results] | |
(let [head (take batch-size remaining) | |
remaining (drop batch-size remaining) | |
have-more? (boolean (seq remaining))] | |
(deferred/chain | |
{:success? true | |
:result {:data head | |
:have-more? have-more?}} ;; :random (apply str (repeat 1000 "NaN ")) | |
(fn [response] | |
;(log/debug "Sending" (count (get-in response [:result :data])) "records") | |
(stream/put! socket response)) | |
(fn [sent-ok] | |
(if sent-ok | |
(if have-more? | |
(stream/try-take! socket read-timeout) | |
true) | |
(log/error "Failed to send response"))) | |
(fn [req] | |
(if req | |
(cond | |
(or (not have-more?) (= :stop (:method req))) true | |
(= :more (:method req)) (deferred/recur remaining) ; next batch | |
:else (do | |
(log/warn "Unexpected request received while streaming: " req) | |
(stream/put! socket {:success? false | |
:error "Busy, try again"}) | |
true)) | |
(do | |
(log/warnf "No `:more` command received in %d milliseconds" read-timeout) | |
true)))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment