Example pipeline model: http://imgur.com/a/G7qEy
Each variable would duplicate this model. Only some variables need the bootstrap function, otherwise it's a sequential pipeline.
| ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
| ;; Forecast generation | |
| (defn inst->forecasts [inst] | |
| (for [r (range 1 25)] | |
| [(c/to-sql-time (t/plus (without-minutes inst) (t/hours r))) (rand-int 50)])) | |
| ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
| ;; Async | |
| (defn process-fn [inst] | |
| {:vals (inst->forecasts inst)}) | |
| ;; there is a launch function for each variable of interest. | |
| ;; in this case, this is the "forecast" launcher. There are other namespaces | |
| ;; with other launch functions, say for "variable 1" "variable 2", etc. Really, only | |
| ;; `schedule`, `process-fn`, and the persist function (`insert-forecasts` here) are unique to each job. | |
| ;; `in`, `out`, and `chime` are shown more clearly for their intent in the pipeline model image linked | |
| ;; in the `pipeline.md` file included in this gist. | |
| (defn launch [] | |
| (let [in (chan) | |
| out (chan) | |
| schedule (p/periodic-seq | |
| (let [n (t/now)] | |
| (t/date-time (t/year n) (t/month n) (t/day n) (t/hour n) 0)) | |
| (t/minutes 1)) | |
| chime (chime-ch schedule)] | |
| ((persist insert-forecasts db) out) | |
| ((process process-fn) in out) | |
| (realtime chime in) | |
| [in out chime])) |
Example pipeline model: http://imgur.com/a/G7qEy
Each variable would duplicate this model. Only some variables need the bootstrap function, otherwise it's a sequential pipeline.
| (ns ngin.utils | |
| (:require [clojure.core.async :refer [<! >! go]])) | |
| (defn realtime [ch in] | |
| (println (str ">>> started `realtime` in " *ns*)) | |
| (go (loop [] | |
| (when-let [v (<! ch)] | |
| (>! in v) | |
| (recur))) | |
| (println (str ">>> closing `realtime` in " *ns*)))) | |
| (defn process [f] | |
| (fn [in out] | |
| (println (str ">>> started `process` in " *ns*)) | |
| (go (loop [] | |
| (when-let [inst (<! in)] | |
| (>! out (f inst)) | |
| (recur))) | |
| (println (str ">>> closing `process` in " *ns*))))) | |
| (defn persist [f db] | |
| (fn [ch] | |
| (println (str ">>> started `persist` in " *ns*)) | |
| (go (loop [] | |
| (when-let [v (<! ch)] | |
| (f db v) | |
| (recur))) | |
| (println (str ">>> closing `persist` in " *ns*))))) |