Skip to content

Instantly share code, notes, and snippets.

@dch
Created April 19, 2013 18:15
Show Gist options
  • Save dch/5422142 to your computer and use it in GitHub Desktop.
Save dch/5422142 to your computer and use it in GitHub Desktop.
(ns factual.riemann.latency
(:use riemann.streams
clojure.tools.logging))
(def warning-latencies
"A map of service names to the minimum warning latency for that service."
(into {} (map (fn [[k v]] [(str "dsapi " k) v])
{"extract post" 5000
"get diffs" 1000
"get inputs" 1000
"post inputs" 25000
"put resolve" 1000})))
(def critical-latencies
"A map of service names to the minimum critical latency for that service."
(into {} (map (fn [[k v]] [k (* 4 v)])
warning-latencies)))
(defn characterize-latency
"Assigns a state (ok, warning, or critical) based on the reported latency.
Takes two maps: one the maximum OK latency for each service, and one of the
maximum warning latency for each service."
[warning-latencies critical-latencies & children]
(apply smap (fn [event]
(if-not (:metric event)
event
(assoc event :state
(let [service (:service event)]
(condp < (:metric event)
(get critical-latencies service 40000) :critical
(get warning-latencies service 10000) :warning
:ok)))))
children))
(defn rates
"Computes rates."
[& children]
(let [sink (adjust [:service str " rate"]
(apply with {:state "ok"
:tags ["rate"]}
children))]
(with {:metric 1 :ttl 3600}
; By-service rates
(by :service
(with :host nil
(rate 5 sink)))
; By host and service rates
(by [:host :service]
(rate 5 sink)))))
(defn my-apdex
[dt & children]
(apdex dt (state "ok") (state "warning")
; Categorize the apdex's state based on its metric
(smap (fn [event]
(assoc event :state
(condp < (:metric event)
0.9 "ok"
0.75 "warning"
"critical")))
(apply adjust [:service str " apdex"]
children))))
(defn latencies
"Handle all latency events. Computes latency distributions, throughput rates,
and apdex figures."
[& children]
(sdo
; Compute request rates
(apply rates children)
; Tag all events with a state appropriate for that service's latency.
(characterize-latency
warning-latencies critical-latencies
(by :service
(with :host nil
; Per-service apdex
(my-apdex 5
(with {:ttl 65 :tags ["apdex"]}
(apply interpolate-constant 5 children)))
; Per-service latency distribution
(percentiles 60 [0.5 0.95 0.99]
(with :ttl 65
(by :service
(apply interpolate-constant 5 children)))))))))
; -*- mode: clojure; -*-
; vim: filetype=clojure
(logging/init :file "/var/log/riemann/riemann.log")
; Load libraries
(include "latency.clj")
(require ['factual.riemann.latency :as 'latency])
; Listen on the local interface over TCP (5555), UDP (5555), and websockets
; (5556)
(let [host "10.20.10.163"]
(tcp-server :host host)
(udp-server :host host)
(ws-server :host host))
; Expire old events from the index every 5 seconds.
(periodically-expire 5)
(defonce graph
(graphite {:host "graphite-backend.corp.factual.com"
:port 2003
:block-start false}))
; Keep events in the index for 5 minutes by default.
(let [index (default :ttl 30 (update-index (index)))
latencies (latency/latencies index graph)]
; Inbound events will be passed to these streams:
(streams
(splitp re-find service
#"dsapi.+" latencies
#"front.+" latencies
index
; Calculate an overall rate of events.
(with {:metric 1 :host nil :state "ok" :service "riemann events/sec"}
(rate 1/4 index))
; Log expired events.
(expired
(fn [event] (info "expired" event)))
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment