Created
March 2, 2014 17:41
-
-
Save daggerrz/9310425 to your computer and use it in GitHub Desktop.
GraphiteReporter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.tapad.common.metrics | |
import java.net.Socket | |
import java.io._ | |
import com.tapad.common.log.Logging | |
import com.twitter.ostrich.stats._ | |
import java.util.TimerTask | |
object GraphiteReporter { | |
def apply(prefix: String, host: String, port: Int, reportInterval: Long) = { | |
val reporter = new GraphiteReporter(prefix, host, port, reportInterval) | |
reporter.add(Stats) | |
} | |
} | |
class GraphiteReporter(prefix: String, host: String, port: Int, reportInterval: Long) extends Logging { | |
private val state = new GraphiteReporterState(prefix, host, port) | |
new java.util.Timer("GraphiteReporter").schedule(new TimerTask { | |
def run() { | |
try { | |
log.debug("Flushing stats to Graphite.") | |
state.flush() | |
log.debug("Graphite flush completed.") | |
} catch { | |
case t: Throwable => log.warn("Error flushing. Data will be retained for next flush...", t) | |
} | |
} | |
}, reportInterval, reportInterval) | |
/** | |
* Adds a stats collection to the monitoring queue. | |
*/ | |
def add(stats: StatsCollection) { state add stats } | |
} | |
/** | |
* Reporting worker thread. | |
*/ | |
private[metrics] class GraphiteReporterState(prefix: String, host: String, port: Int) extends Logging { | |
private var socket: Socket = _ | |
private var writer: OutputStreamWriter = _ | |
private var collections: Set[StatsCollection] = Set() | |
private def connect() { | |
synchronized { | |
if (socket == null || !socket.isConnected) { | |
log.info("Connecting to {}...", host + ":" + port) | |
socket = new Socket(host, port) | |
writer = new OutputStreamWriter(socket.getOutputStream) | |
log.info("Connected.") | |
} | |
} | |
} | |
private def disconnect() { | |
synchronized { | |
log.info("Disconnecting...") | |
if (socket != null && !socket.isClosed) socket.close() | |
socket = null | |
log.info("Disconnected.") | |
} | |
} | |
def flush() { | |
synchronized { | |
connect() | |
try { | |
val now = System.currentTimeMillis | |
collections.foreach { coll => | |
Graphite.writeGraphiteMsg(prefix, now, coll, writer) | |
coll.clearAll() | |
} | |
} catch { | |
case e: IOException => | |
disconnect() | |
throw e | |
} | |
} | |
} | |
def add(s: StatsCollection) { | |
synchronized { | |
collections += s | |
} | |
} | |
} | |
object Graphite extends Logging { | |
/** | |
* Convert a long value into a Graphite line. | |
*/ | |
private def toGraphiteLine(name: String, value: Long, epoch: Long) = { | |
name + " " + value + " " + epoch | |
} | |
/** | |
* Convert a double value into a Graphite line. | |
*/ | |
private def toGraphiteLine(name: String, value: Double, epoch: Long)(implicit df: java.text.DecimalFormat) = { | |
name + " " + df.format(value) + " " + epoch | |
} | |
/** | |
* Write a stats collection to a Graphite writer. | |
*/ | |
def writeGraphiteMsg(prefix: String, timestamp: Long, stats: StatsCollection, out: Writer) { | |
implicit val df = new java.text.DecimalFormat("#0.00") | |
val epoch = timestamp / 1000 | |
val report = stats.get() | |
val metrics = report.metrics.toList.flatMap { | |
case (name, distribution) => | |
val values = | |
List(0.90, 0.95, 0.99).map(p => "p" + (p * 100).toInt -> distribution.histogram.getPercentile(p).toLong) ++ | |
List("count" -> distribution.histogram.count, "average" -> distribution.average.toLong) | |
values.toMap.toList.map { | |
case (metric, value) => | |
toGraphiteLine(prefix + "." + name + "_" + metric, value, epoch) | |
} | |
} | |
val counters = report.counters.toList.map { | |
case (name, value) => | |
toGraphiteLine(prefix + "." + name, value, epoch) | |
} | |
val gauges = report.gauges.toList.map { | |
case (name, value) => | |
toGraphiteLine(prefix + "." + name, value, epoch) | |
} | |
(metrics ++ counters ++ gauges).foreach { | |
line => | |
log.debug(line) | |
out.write(line) | |
out.write('\n') | |
out.flush() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment