Last active
April 22, 2024 06:42
-
-
Save v-zhuravlev/d593d12eba6e89f71a253d68da7916d6 to your computer and use it in GitHub Desktop.
Send metrics to influx(and zabbix_sender) from riemann only if value changed(or when heartbeat interval of 300seconds) or if state changed (for service client-test)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; InfluxDB database details where Riemann will store the data. Setup :username and :password | |
;; if you added security step behind riemann database access. | |
(def influxdb-creds { | |
:version :0.9 | |
:host "localhost" | |
:port 8086 | |
:db "riemann" | |
; :username "riemann" | |
; :password "riemann" | |
}) | |
(def influxBatchSender | |
(batch 100 1/10 | |
(async-queue! :agg {:queue-size 1000 | |
:core-pool-size 1 | |
:max-pool-size 4 | |
:keep-alive-time 60000} | |
(influxdb influxdb-creds)))) | |
;; Riemann log file location | |
(logging/init {:file "/var/log/riemann/riemann.log"}) | |
;;check index | |
(defn is-metric-changed? | |
"Is Riemann currently service metric changed?" | |
[event] | |
(->> (list 'and | |
(list '= ':host (get event :host)) | |
(list '= ':service (get event :service))) | |
; Search the current Riemann core's index for any matching events | |
(riemann.index/search (:index @core)) | |
; Take the first match | |
first | |
; Find its metric | |
:metric | |
; Is it the value same as before? | |
(= (get event :metric)))) | |
(defn heartbeat | |
"Sends metric every 'interval' seconds to 'output-stream' destination" | |
[interval output-stream] | |
;https://groups.google.com/forum/#!topic/riemann-users/_oFx8YJ4NNs | |
(pipe - | |
;double check that will work with same service but different host | |
(by [:host :service]( | |
;heartbeat 1 packet per 5mins for each service/host | |
throttle 1 interval | |
output-stream | |
(partial prn "Will send anyway as heartbeat") | |
)))) | |
;zabbix-var to wrap service name. use 'nowrap' if you do not want to wrap | |
(def zabbix-var "riemann") | |
(def to-zabbix-sender-file | |
(fn[event key-wrap] (info ( str | |
"\"" | |
(get event :host) | |
"\" \"" | |
(if (= key-wrap "nowrap") | |
(get event :service) | |
;else wrap in wrap var | |
(str | |
key-wrap | |
"[\\\"" | |
(get event :service) | |
"\\\"]" | |
) | |
) | |
"\" \"" | |
(get event :metric) | |
"\"" | |
)))) | |
(def test-thresholds | |
{"client-test" {:warning 2 :critical 3}}) | |
(def test-exact-thresholds | |
{"client-test" {:exact 1}}) | |
(require '[org.spootnik.riemann.thresholds :refer [threshold-check]]) | |
;; listen on the local interface over TCP (5555), UDP (5555) and websockets (5556) | |
(let [host "0.0.0.0"] | |
(tcp-server {:host host}) | |
(udp-server {:host host}) | |
(ws-server {:host host})) | |
;(instrumentation {:enabled? false}) | |
;; Expire states from its core's index every 60 seconds. Default is 10. | |
(periodically-expire 60) | |
(let [index (index)] | |
(streams | |
(default :ttl 60 | |
;if metric is not the same ,send it now | |
(where (not (is-metric-changed? event)) | |
;(partial prn "Metric has changed") | |
influxBatchSender | |
#(to-zabbix-sender-file % zabbix-var) | |
;if it is the same: send only once per 300 | |
(else | |
;(partial prn "Metric is the same") | |
(heartbeat 300 influxBatchSender) | |
(heartbeat 300 #(to-zabbix-sender-file % zabbix-var)) | |
) | |
) | |
index | |
(where (service "client-test") | |
(smap (threshold-check test-thresholds) | |
(changed :state | |
influxBatchSender | |
#(to-zabbix-sender-file % zabbix-var) | |
)))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
telegraf sample config used with this: