Last active
April 22, 2024 06:42
-
-
Save v-zhuravlev/d593d12eba6e89f71a253d68da7916d6 to your computer and use it in GitHub Desktop.
Send metrics to influx(and zabbix_sender) from riemann only if value changed(or when heartbeat interval of 300seconds) or if state changed (for service client-test)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; InfluxDB database details where Riemann will store the data. Setup :username and :password | |
;; if you added security step behind riemann database access. | |
(def influxdb-creds { | |
:version :0.9 | |
:host "localhost" | |
:port 8086 | |
:db "riemann" | |
; :username "riemann" | |
; :password "riemann" | |
}) | |
(def influxBatchSender | |
(batch 100 1/10 | |
(async-queue! :agg {:queue-size 1000 | |
:core-pool-size 1 | |
:max-pool-size 4 | |
:keep-alive-time 60000} | |
(influxdb influxdb-creds)))) | |
;; Riemann log file location | |
(logging/init {:file "/var/log/riemann/riemann.log"}) | |
;;check index | |
(defn is-metric-changed? | |
"Is Riemann currently service metric changed?" | |
[event] | |
(->> (list 'and | |
(list '= ':host (get event :host)) | |
(list '= ':service (get event :service))) | |
; Search the current Riemann core's index for any matching events | |
(riemann.index/search (:index @core)) | |
; Take the first match | |
first | |
; Find its metric | |
:metric | |
; Is it the value same as before? | |
(= (get event :metric)))) | |
(defn heartbeat | |
"Sends metric every 'interval' seconds to 'output-stream' destination" | |
[interval output-stream] | |
;https://groups.google.com/forum/#!topic/riemann-users/_oFx8YJ4NNs | |
(pipe - | |
;double check that will work with same service but different host | |
(by [:host :service]( | |
;heartbeat 1 packet per 5mins for each service/host | |
throttle 1 interval | |
output-stream | |
(partial prn "Will send anyway as heartbeat") | |
)))) | |
;zabbix-var to wrap service name. use 'nowrap' if you do not want to wrap | |
(def zabbix-var "riemann") | |
(def to-zabbix-sender-file | |
(fn[event key-wrap] (info ( str | |
"\"" | |
(get event :host) | |
"\" \"" | |
(if (= key-wrap "nowrap") | |
(get event :service) | |
;else wrap in wrap var | |
(str | |
key-wrap | |
"[\\\"" | |
(get event :service) | |
"\\\"]" | |
) | |
) | |
"\" \"" | |
(get event :metric) | |
"\"" | |
)))) | |
(def test-thresholds | |
{"client-test" {:warning 2 :critical 3}}) | |
(def test-exact-thresholds | |
{"client-test" {:exact 1}}) | |
(require '[org.spootnik.riemann.thresholds :refer [threshold-check]]) | |
;; listen on the local interface over TCP (5555), UDP (5555) and websockets (5556) | |
(let [host "0.0.0.0"] | |
(tcp-server {:host host}) | |
(udp-server {:host host}) | |
(ws-server {:host host})) | |
;(instrumentation {:enabled? false}) | |
;; Expire states from its core's index every 60 seconds. Default is 10. | |
(periodically-expire 60) | |
(let [index (index)] | |
(streams | |
(default :ttl 60 | |
;if metric is not the same ,send it now | |
(where (not (is-metric-changed? event)) | |
;(partial prn "Metric has changed") | |
influxBatchSender | |
#(to-zabbix-sender-file % zabbix-var) | |
;if it is the same: send only once per 300 | |
(else | |
;(partial prn "Metric is the same") | |
(heartbeat 300 influxBatchSender) | |
(heartbeat 300 #(to-zabbix-sender-file % zabbix-var)) | |
) | |
) | |
index | |
(where (service "client-test") | |
(smap (threshold-check test-thresholds) | |
(changed :state | |
influxBatchSender | |
#(to-zabbix-sender-file % zabbix-var) | |
)))))) |
telegraf sample config used with this:
# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply prepend
# them with $. For strings the variable must be within quotes (ie, "$STR_VAR"),
# for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR)
# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills.
## This buffer only fills when writes fail to output plugin(s).
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## By default, precision will be set to the same timestamp order as the
## collection interval, with the maximum being 1s.
## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "ns", "us" (or "µs"), "ms", "s".
precision = ""
## Logging configuration:
## Run telegraf with debug log messages.
debug = false
## Run telegraf in quiet mode (error log messages only).
quiet = false
## Specify the log file name. The empty string means to log to stderr.
logfile = ""
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
###############################################################################
# OUTPUT PLUGINS #
###############################################################################
# Configuration for influxdb server to send metrics to
[[outputs.influxdb]]
## The full HTTP or UDP endpoint URL for your InfluxDB instance.
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
# urls = ["udp://localhost:8089"] # UDP endpoint example
urls = ["http://localhost:8086"] # required
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Retention policy to write to. Empty string writes to the default rp.
retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
write_consistency = "any"
## Write timeout (for the InfluxDB client), formatted as a string.
## If not provided, will default to 5s. 0s means no timeout (not recommended).
timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Set the user agent for HTTP POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
## Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
# udp_payload = 512
## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false
[[inputs.procstat]]
exe = "influxd"
[[inputs.procstat]]
exe = "telegraf"
[[inputs.procstat]]
pattern = "java -cp /usr/lib/riemann/riemann.jar"
# # Retrieves SNMP values from remote agents
[[inputs.snmp]]
agents = [ "10.100.0.3:161","10.100.0.2:161","10.100.0.1:161" ]
version = 2
community = "public"
name = "snmp"
[[inputs.snmp.field]]
name = "host"
oid = "RFC1213-MIB::sysName.0"
is_tag = true
[[inputs.snmp.table]]
name = "snmp"
inherit_tags = [ "host" ]
oid = "IF-MIB::ifTable"
[[inputs.snmp.table.field]]
name = "ifName"
oid = "IF-MIB::ifName"
is_tag = true
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
another example:
riemann-health->riemann->zabbix
run in cli:
tail -f /var/log/riemann/riemann.log | awk -F ' - ' '/\".+\" \".+\" \".+\"/ {print $3}' | zabbix_sender -z 127.0.0.1 --real-time -i - -vv
Where riemann.service should match zabbix zabbix-var["item.key"]