Skip to content

Instantly share code, notes, and snippets.

@ichramm
Last active November 21, 2019 20:09
Show Gist options
  • Save ichramm/0348f2cd068b9e3092fab0686ceae1d8 to your computer and use it in GitHub Desktop.
Save ichramm/0348f2cd068b9e3092fab0686ceae1d8 to your computer and use it in GitHub Desktop.
stream paginated
;;;;
;; Client
;;;;
(defn stream-seq-response-with-more!
"Returns a lazy sequence that wraps the streaming of data from the server"
[conn results have-more?]
(lazy-seq
(if (empty? results)
(if have-more?
(when-let [result (send-request-get-response-sync! conn {:method :more})]
(cons (first (:data result))
(stream-seq-response! conn
(rest (:data result))
(:have-more? result))))
(return-connection conn))
(cons (first results)
(stream-seq-response! conn
(rest results)
have-more?)))))
;;;;
;; Server
;;;;
; TODO: Remove this if the other approach passes stress test (which seems to be the case, btw)
(defn stream-seq-response-with-more
"Helper function that streams potentially long sequences to the client"
([socket results]
(stream-seq-response-with-more socket results 100 10000))
([socket results batch-size read-timeout]
(deferred/loop [remaining results]
(let [head (take batch-size remaining)
remaining (drop batch-size remaining)
have-more? (boolean (seq remaining))]
(deferred/chain
{:success? true
:result {:data head
:have-more? have-more?}} ;; :random (apply str (repeat 1000 "NaN "))
(fn [response]
;(log/debug "Sending" (count (get-in response [:result :data])) "records")
(stream/put! socket response))
(fn [sent-ok]
(if sent-ok
(if have-more?
(stream/try-take! socket read-timeout)
true)
(log/error "Failed to send response")))
(fn [req]
(if req
(cond
(or (not have-more?) (= :stop (:method req))) true
(= :more (:method req)) (deferred/recur remaining) ; next batch
:else (do
(log/warn "Unexpected request received while streaming: " req)
(stream/put! socket {:success? false
:error "Busy, try again"})
true))
(do
(log/warnf "No `:more` command received in %d milliseconds" read-timeout)
true))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment