Skip to content

Instantly share code, notes, and snippets.

@faxm0dem
Created May 22, 2015 15:40
Show Gist options
  • Select an option

  • Save faxm0dem/e1befcc33123256e96d7 to your computer and use it in GitHub Desktop.

Select an option

Save faxm0dem/e1befcc33123256e96d7 to your computer and use it in GitHub Desktop.
riemann reinject example
(let [
index (default {:state "ok" :ttl 60} (index))
indexer (default {:state "ok" :ttl 60}
index
(where (not (expired? event))
(changed-state {:init "ok"}
(tag "changed-state"
index))))
]
(streams
indexer
(where (tagged "collectd")
(where (expired? event)
(with {:metric 1 :host nil :service "riemann-collectd/gauge-expired" :state "ok"}
(rate 30
index))))
(where (tagged ["usag.workerge" "status.Up"])
(where (service "load/load-relative/shortterm")
(where (not (expired? event))
(fixed-time-window 60
(smap (fn [events]
(let [
nall (count events)
nhit (count (filter #(< 1.2 (:metric %)) events))
frac (if nall (/ nhit nall) nil)
stat (if frac (condp < frac
0.1250 "critical"
0.0375 "warning"
"ok") nil)
]
{:service "overload/count-load_relative"
:metric frac
:description "count of worker nodes where load-relative > 1.2"
:host "gridengine.in2p3.fr"
:ttl 240
:url "http://path/to/file.html"
:time (riemann.time/unix-time-real)
:state stat
}))
indexer)))))
(where (service "users/users")
(coalesce 15
(smap folds/sum
(with {:ttl 120 :host nil :service "total users"}
indexer))))
(by :host
(where metric
(where (not (expired? event))
(project [(service "protocols-Udp/protocol_counter-InErrors")
(service "protocols-Udp/gauge-InTotal")]
(smap folds/quotient-sloppy
(with {:service "protocols-Udp/percent-InErrors"
:type_instance "InErrors"
:type "percent"
:ds_type "gauge"}
(adjust [:metric * 100]
(split
(< 50 metric) (with :state "critical" indexer)
(< 2 metric) (with :state "warning" indexer)
(with :state "ok" indexer)))))))))
(by :host
(where metric
(project [(service "protocols-Udp/protocol_counter-InErrors")
(service "protocols-Udp/protocol_counter-NoPorts")
(service "protocols-Udp/protocol_counter-RcvbufErrors")
(service "protocols-Udp/protocol_counter-InDatagrams")]
(smap folds/sum
(with {:service "protocols-Udp/gauge-InTotal"
:type_instance "InTotal"
:type "gauge"
:ds_type "gauge"}
reinject)))))
(where (service "riemann streams rate")
(where (expired? event)
(with {:state "warning" :ttl 3600}
indexer)))
)
)
@vipinvkmenon
Copy link

I was going through this code (Pretty helpful thanks :) )
and I wanted to understand what is the difference between reinjecting and using the indexer?
I've seen in the documentation about avoiding reinjects. So is the indexer another approach to do the same? Like a more recommended approach?
Thanks

@faxm0dem
Copy link
Author

how on earth did you find this ? ;-)

@faxm0dem
Copy link
Author

reinject really does what it says : it bumps the events back upstream.
index simply feeds the index, but doesn't reinject the event. You can look at it like a photograph or a snapshot of all current events.

The reason the documentation says reinject is dangerous, is because it can create endless loops. Just make sure the reinjected events never reach the stream where they're being reinjected (which is the case in this example).

reinject can always be avoided, alas with some more verbosity. It almost always is a shortcut for lazyness' sake (which is the case in this example).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment