Created
September 8, 2014 14:07
-
-
Save drbobbeaty/f2eca2d254971f5b1257 to your computer and use it in GitHub Desktop.
Clojure code to get the bolt metrics for a Storm Topology
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns gym.storm.nimbus | |
"Namespace for exercising Nimbus to find out facts about the specific storm | |
cluster. This data can be used to monitor the cluster and look at stats in | |
a way that's got very low load on the overall system." | |
(:require [backtype.storm.clojure :refer :all] | |
[backtype.storm.config :refer :all] | |
[backtype.storm.ui.core :refer :all] | |
[clj-endpoints :as ep] | |
[clj-endpoints.persistence.redis :refer [wcar]] | |
[clj-endpoints.util :as util] | |
[clojure.string :as cs] | |
[clojure.tools.logging :refer [error infof warnf]] | |
[taoensso.carmine :as car]) | |
(:import [org.apache.thrift7.transport TSocket TFramedTransport] | |
[org.apache.thrift7.protocol TBinaryProtocol] | |
[backtype.storm.generated Nimbus$Client SupervisorSummary])) | |
(defn get-emitted-totals | |
"Function to take a config map and a topology name, and query Nimbus to | |
see what the `emitted` totals are for each of the bolts in the topology. This | |
is a very lightweight way to keep track of what's going on in the topology | |
without monitoring all the messages coming out of the kafka cluster." | |
[cluster topo] | |
(let [cfg (ep/config cluster) | |
tft (TFramedTransport. (TSocket. (:nimbus cfg) 6627)) | |
nc (Nimbus$Client. (TBinaryProtocol. tft))] | |
(try | |
(.open tft) | |
(let [ci (.getClusterInfo nc) | |
ts (first (filter #(= topo (.get_name %)) (.get_topologies ci))) | |
ti (.getTopologyInfo nc (.get_id ts)) | |
exes (.get_executors ti) | |
cnts (into {} (for [[k v] (group-by #(.get_component_id %) exes)] | |
[(keyword k) (util/safe-sum (map get-emitted v))]))] | |
(.close tft) | |
{ :nimbus-host (:nimbus cfg) | |
:topology topo | |
:executors (count exes) | |
:counts cnts }) | |
(catch Exception e (warnf "Exception thrown: %s" (.getMessage e)))))) | |
(defn get-capacity | |
"Function to take a config map and a topology name, and query Nimbus to | |
see what the capacity is for each of the bolts in the topology. This | |
is a very lightweight way to keep track of what's going on in the topology | |
without monitoring all the messages coming out of the kafka cluster." | |
[cluster topo] | |
(let [cfg (ep/config cluster) | |
tft (TFramedTransport. (TSocket. (:nimbus cfg) 6627)) | |
nc (Nimbus$Client. (TBinaryProtocol. tft))] | |
(try | |
(.open tft) | |
(let [ci (.getClusterInfo nc) | |
ts (first (filter #(= topo (.get_name %)) (.get_topologies ci))) | |
tid (.get_id ts) | |
ti (.getTopologyInfo nc tid) | |
st (.getTopology nc tid) | |
exes (.get_executors ti) | |
bolts (group-by-comp (filter (partial bolt-summary? st) exes)) | |
caps (into {} (for [[id bc] bolts] | |
[(keyword id) (util/to-4dp (compute-bolt-capacity bc))]))] | |
(.close tft) | |
{ :nimbus-host (:nimbus cfg) | |
:topology topo | |
:executors (count exes) | |
:capacity caps }) | |
(catch Exception e (warnf "Exception thrown: %s" (.getMessage e)))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment