Created
August 14, 2015 06:22
-
-
Save robert-stuttaford/3bd5240c988f05092504 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns onyx-tx-report-queue | |
(:require [clojure.core.async :refer [>!! alts!! chan close! put! thread]] | |
[clojure.tools.logging :as log] | |
;; :all so clj-refactor doesn't remove it: | |
[onyx.plugin.core-async :refer :all] | |
[datomic.api :as d]) | |
(:import [java.util.concurrent TimeUnit])) | |
(defn prepare-datom [db [e a v tx added]] | |
[e (d/ident db a) v tx added]) | |
(defn inject-tx-report-chan [event {:keys [db-uri capacity]}] | |
(let [conn (d/connect db-uri) | |
tx-report-ch (chan capacity) | |
control-ch (chan)] | |
(thread | |
(log/info "Started tx-report thread for:" db-uri) | |
(try (let [queue (d/tx-report-queue conn)] | |
(while (let [value (alts!! [control-ch] :default ::continue)] | |
(log/info "tx-report-thread signal value:" value) | |
(= [::continue :default] value)) | |
(when-let [tx-result (.poll queue 5 TimeUnit/SECONDS)] | |
(log/debug tx-result) | |
(let [db (:db-after tx-result)] | |
;; this will print out a summary of the attrs | |
;; present in the tx. useful to see the shape of | |
;; the data without having to read the whole datoms vec. | |
(log/info "Submitting" (count (:tx-data tx-result)) "datoms for:" db-uri | |
"\n\t" (->> (:tx-data tx-result) | |
(map :a) | |
distinct | |
(map (partial d/attribute db)) | |
(map :ident) | |
sort | |
vec)) | |
(doseq [datom (:tx-data tx-result) | |
:let [segment {:datom (prepare-datom db datom) | |
:db-uri db-uri | |
:t (t-for-db db-uri)}]] | |
(log/debug "Submitting datom from tx: " (:datom segment)) | |
(>!! tx-report-ch segment))))) | |
(log/info "Stopped tx-report thread for:" db-uri)) | |
(catch Exception e | |
(log/debug "TX-REPORT-TAKE exception: " e) | |
(throw e)))) | |
{:core.async/chan tx-report-ch | |
:control-chan control-ch})) | |
;; we spiked this part but have disabled it (by commenting out the | |
;; after-task-stop item below) because it is called after the first tx | |
;; is processed rather than when we kill the job. but you get what | |
;; we're trying to do, i hope :-) | |
(defn dispose-tx-report-chan [event {:keys [db-uri]}] | |
(d/remove-tx-report-queue (d/connect db-uri)) | |
(log/info "Stopping tx-report thread for" db-uri "...") | |
(put! (:control-chan event) ::stop) | |
(close! (:control-chan event)) | |
(close! (:core.async/chan event)) | |
(dissoc event :control-chan :core.async/chan)) | |
(def tx-report-calls | |
{:lifecycle/before-task-start inject-tx-report-chan | |
:lifecycle/after-task-stop dispose-tx-report-chan}) | |
(defn tx-report-lifecycles [db-uri capacity] | |
[{:lifecycle/task :tx-report | |
:db-uri db-uri | |
:capacity capacity | |
:lifecycle/calls :onyx-tx-report-queue/tx-report-calls} | |
{:lifecycle/task :tx-report | |
:lifecycle/calls :onyx.plugin.core-async/reader-calls}]) | |
(defn lazy-worker-bee [entries segment] | |
segment) | |
(defn catalog [db-uri batch-size] | |
[{:onyx/name :tx-report | |
:onyx/ident :core.async/read-from-chan | |
:onyx/type :input | |
:onyx/medium :core.async | |
:onyx/batch-size batch-size | |
:onyx/max-peers 1 | |
:onyx/doc "Reads segments from a core.async channel"} | |
{:onyx/name :lazy-worker-bee | |
:onyx/fn :onyx-tx-report-queue/lazy-worker-bee | |
:onyx/type :function | |
:onyx/batch-size batch-size}]) | |
(comment | |
(let [db-uri "datomic:free://your-db" | |
input-chan-capacity 100 | |
batch-size 1] | |
(onyx/submit-job | |
peer-config | |
{:catalog (catalog db-uri batch-size) | |
:workflow [:tx-report :lazy-worker-bee] | |
:lifecycles (tx-report-lifecycles db-uri input-chan-capacity) | |
:flow-conditions [] | |
:task-scheduler :onyx.task-scheduler/balanced})) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment