Skip to content

Instantly share code, notes, and snippets.

@jmcarp
Created October 27, 2016 01:08
Show Gist options
  • Select an option

  • Save jmcarp/6b2110c398e892ebc2e0bbf84b3abc58 to your computer and use it in GitHub Desktop.

Select an option

Save jmcarp/6b2110c398e892ebc2e0bbf84b3abc58 to your computer and use it in GitHub Desktop.
new adventures in stream processing
;; Goal: Page if the firehose stops sending to riemann, and close pages if it resumes
;; Replace all firehose events with synthetic heartbeat events and index, then merge with existing stream and listen for expiration
;; h/t @sharms
(where (service #"^cf.*")
(smap (fn [ev] (if (= (:state ev) nil) (event {:service "cf.nozzle.heartbeat" :host "internal" :metric 1 :state "ok" :ttl 30}) ev)) index
(where (service "cf.nozzle.heartbeat")
(changed-state {:init "ok"}
(where (state "ok") (:resolve pd)
(else (where (state "expired") (:trigger pd))))))))
;; Merge all firehose events into a throttled synthetic stream and reinject, then listen for expiration on new stream
(where (service #"^cf\.")
(throttle 1 15
(with {:service "internal.firehose.heartbeat"
:state "ok"
:ttl 30}
reinject)))
(where (service "internal.firehose.heartbeat")
(changed-state {:init "ok"}
(where (state "ok") (:resolve pd)
(else (:trigger pd)))))
;; List for expiration on a firehose stream that happens to emit at short, regular intervals
(where (service #"^cf\.bbs\.LRPs")
(changed-state {:init "ok"}
(where (state "ok") (:resolve pd)
(else (:trigger pd)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment