Created
March 7, 2014 19:30
-
-
Save candera/9418198 to your computer and use it in GitHub Desktop.
transact-firehose
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; This has to be a separate function. The way we had it before, with | |
;; a loop inside a future, creates a closure that captures the head of | |
;; the sequence. Locals clearing here saves us from that. | |
(defn- enqueue-transactions | |
"Transact a (potentially infinite) sequence of transactions on `q`, | |
reporting status via `status`. Pause for `delay` (if non-nil) | |
between each transaction." | |
[conn [tx & more] q status delay] | |
(if (or (not tx) (:done @status)) | |
(.put q (future :done)) | |
(do | |
(try | |
(.put q (d/transact-async conn tx)) | |
(catch Throwable t | |
(swap! status assoc-in [:producer :error] t) | |
(swap! status assoc :done true) | |
(-> @status :consumer :future future-cancel) | |
(throw t)) | |
(finally | |
(swap! status update-in [:producer :tx-count] (fnil inc 0)) | |
(swap! status assoc-in [:producer :tx] tx) | |
;; This next line lets us reach into a running | |
;; process and determine where we are, so we | |
;; can stop and pick up where we left off. | |
(swap! status assoc-in [:producer :txes-remaining] more))) | |
(when delay (Thread/sleep delay)) | |
(recur conn more q status delay)))) | |
(defn transact-firehose | |
"Given a database connection and a sequence of transactions, shove | |
them into the database as fast as possible. Stops immediately if any | |
of the transactions fail. Return a [future status] pair, where | |
future represents the thing doing the transacting, and can be | |
derefed to block on completion, and status is an atom containing | |
information about the ongoing operation. Set :done in the status map | |
to true to stop the computation. | |
As a simple way to throttle throughput, allows for a short pause of | |
`delay` milliseconds between each transaction. nil means run at full | |
speed." | |
([conn txes] (transact-firehose conn txes nil)) | |
([conn txes delay] | |
(let [q (java.util.concurrent.LinkedBlockingQueue. 1000) | |
status (atom {:start-time (java.util.Date.) | |
:queue q}) | |
producer (future (enqueue-transactions conn txes q status delay)) | |
consumer (future | |
(try | |
(while (and (not (:done @status)) (not= :done (deref (.take q)))) | |
(swap! status update-in [:consumer :tx-count] (fnil inc 0))) | |
(catch Throwable t | |
(swap! status assoc-in [:consumer :error] t) | |
(throw t)) | |
(finally | |
(swap! status assoc :done true))))] | |
(swap! status assoc-in [:producer :future] producer) | |
(swap! status assoc-in [:consumer :future] consumer) | |
[consumer status]))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment