-
-
Save ibuenros/9b94736c2bad2f4b8e23 to your computer and use it in GitHub Desktop.
//================================================================== | |
// SPARK INSTRUMENTATION | |
//================================================================== | |
import com.codahale.metrics.{MetricRegistry, Meter, Gauge} | |
import org.apache.spark.{SparkEnv, Accumulator} | |
import org.apache.spark.metrics.source.Source | |
import org.joda.time.DateTime | |
import scala.collection.mutable | |
/** Instrumentation for Spark based on accumulators. | |
* | |
* Usage: | |
* val instrumentation = new SparkInstrumentation("example.metrics") | |
* val numReqs = sc.accumulator(0L) | |
* instrumentation.source.registerDailyAccumulator(numReqs, "numReqs") | |
* instrumentation.register() | |
* | |
* Will create and report the following metrics: | |
* - Gauge with total number of requests (daily) | |
* - Meter with rate of requests | |
* | |
* @param prefix prefix for all metrics that will be reported by this Instrumentation | |
*/ | |
class SparkInstrumentation(prefix: String) extends Serializable { | |
val accumulators = mutable.Set[Accumulator[Long]]() | |
private class InstrumentationSource(prefix: String) extends Source { | |
val metricRegistry = new MetricRegistry | |
val sourceName = prefix | |
val oldgauges = mutable.Map[String,Long]() | |
val oldtimes = mutable.Map[String, DateTime]() | |
val meters = mutable.Map[String,Meter]() | |
/** Computes metrics based on accumulator. Gauge never resets. | |
* | |
* @param a Metrics will be derived from this accumulator | |
* @param name Name of the metrics | |
*/ | |
def registerAccumulator(a: Accumulator[Long], name: String){ | |
oldgauges += (name -> 0L) | |
meters += (name -> metricRegistry.meter(name + "-rate")) | |
metricRegistry.register(MetricRegistry.name(name), | |
new Gauge[Long] { | |
override def getValue: Long = { | |
meters(name).mark(a.value - oldgauges(name)) | |
oldgauges(name) = a.value | |
return a.value | |
} | |
}) | |
} | |
/** Computes metrics based on accumulator. Gauge resets at the end of the day. | |
* | |
* @param a Metrics will be derived from this accumulator | |
* @param name Name of the metrics | |
*/ | |
def registerDailyAccumulator(a: Accumulator[Long], name: String){ | |
oldgauges += (name -> 0L) | |
meters += (name -> metricRegistry.meter(name + "-rate")) | |
oldtimes += (name -> DateTime.now) | |
metricRegistry.register(MetricRegistry.name(name), | |
new Gauge[Long] { | |
override def getValue: Long = { | |
meters(name).mark(a.value - oldgauges(name)) | |
val now = DateTime.now | |
if (now.getDayOfMonth != oldtimes(name).getDayOfMonth){ | |
a.setValue(0L) | |
} | |
oldtimes(name) = now | |
oldgauges(name) = a.value | |
return a.value | |
} | |
}) | |
} | |
} | |
val source = new InstrumentationSource(prefix) | |
/** Register the Instrumentation with Spark so the metrics are reported to any provided Sink. */ | |
def register(){ | |
SparkEnv.get.metricsSystem.registerSource(source) | |
} | |
} | |
//============================================ | |
// STREAMING LAUNCHER / SERVER | |
//============================================ | |
import scalax.io.Resource | |
import org.apache.spark.deploy.yarn.{Client, ClientArguments} | |
import org.apache.spark.{Logging, SparkConf} | |
import org.apache.hadoop.yarn.api.records.{ApplicationReport, YarnApplicationState, ApplicationId} | |
import org.apache.hadoop.yarn.client.ClientRMProxy | |
import org.apache.hadoop.yarn.api.ApplicationClientProtocol | |
import org.apache.hadoop.yarn.api.protocolrecords.{GetApplicationsResponse, GetApplicationsRequest} | |
import org.eclipse.jetty.server.{Request, Handler, Server} | |
import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler, AbstractHandler} | |
import javax.servlet.http.{HttpServletResponse, HttpServletRequest} | |
import com.lambdaworks.jacks.JacksMapper | |
import scala.annotation.tailrec | |
import org.eclipse.jetty.util.thread.QueuedThreadPool | |
import scala.util.{Failure, Success, Try} | |
/** Local launcher client for streaming applications in YARN. | |
* | |
* Extends usual Spark YARN client for streaming applications. This class should not be called by the user, | |
* instead, the StreamingClient object is the entry point for launching applications. | |
* | |
* @param args User supplied arguments | |
* @param sparkConf Spark Configuration | |
*/ | |
class StreamingClient(args: ClientArguments, sparkConf: SparkConf) extends Client(args, sparkConf) { | |
import scala.collection.JavaConversions._ | |
import scala.collection.JavaConverters._ | |
var appIdOption: Option[ApplicationId] = None | |
val clientHttp = new ClientHttp(this) | |
val launcherArgs = args | |
/** Connects to or launches application in YARN cluster. | |
* | |
* 1. Search for existing application by the same name. | |
* 2. If found, monitor existing application. | |
* 3. If not found, launch new application with this name, and monitor. | |
*/ | |
override def run() { | |
sparkConf.set("spark.yarn.report.interval", "10000") | |
clientHttp.bind() | |
println("Using yarn at " + yarnConf.getRaw("fs.defaultFS")) | |
val pidFile = System.getenv("SPARK_LAUNCHER_PID_DIR") match { | |
case "" => None | |
case x => Some(Resource.fromFile("%s/%s.appInfo".format(x,args.appName))) | |
} | |
val instances = getSparkApplications(setAsJavaSet(Set("SPARK")),java.util.EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.ACCEPTED, YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED)).flatMap { report => | |
if (report.getName == args.appName){ | |
Some(report.getApplicationId) | |
} else { | |
None | |
} | |
}.toList | |
if (instances.size > 0){ | |
println("Application already running. Monitoring old application.") | |
init(yarnConf) | |
start() | |
appIdOption = Some(instances.head) | |
monitorApplication(appIdOption.get) | |
System.exit(0) | |
} else{ | |
//super.stop() | |
println("Application not found.") | |
appIdOption = Some(runApp()) | |
if (!appIdOption.isDefined){ | |
println("Application didn't start correctly") | |
System.exit(1) | |
} | |
if (pidFile.isDefined) { | |
pidFile.get.write("%s %s".format(appIdOption.get.toString, args.userArgs.mkString(" "))) | |
System.exit(0) | |
} | |
monitorApplication(appIdOption.get) | |
System.exit(0) | |
} | |
} | |
/** Gets list of Spark applications in YARN cluster */ | |
def getSparkApplications(applicationTypes: java.util.Set[String], applicationStates: java.util.EnumSet[YarnApplicationState]): java.util.List[ApplicationReport] = { | |
setConfig(yarnConf) | |
val rmClient = ClientRMProxy.createRMProxy(getConfig, classOf[ApplicationClientProtocol]) | |
val request: GetApplicationsRequest = GetApplicationsRequest.newInstance(applicationTypes, applicationStates) | |
val response: GetApplicationsResponse = rmClient.getApplications(request) | |
return response.getApplicationList | |
} | |
} | |
/** Local launcher client for streaming applications in YARN. | |
* | |
* Usage: | |
* java -cp /etc/hadoop/conf:AppJar.jar:spark-assembly.jar org.apache.spark.yarn.StreamingClient --jar AppJar.jar | |
* --addJars /jars/config.jar --class ooyala.app.MainClass --arg arg1 --arg arg2 --name MyApp | |
* | |
*/ | |
object StreamingClient { | |
def main(argStrings: Array[String]) { | |
// Set an env variable indicating we are running in YARN mode. | |
// Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes - | |
// see Client#setupLaunchEnv(). | |
System.setProperty("SPARK_YARN_MODE", "true") | |
val sparkConf = new SparkConf() | |
val args = new ClientArguments(argStrings, sparkConf) | |
new StreamingClient(args, sparkConf).run() | |
} | |
} | |
/** Starts an HTTP server for the launcher client. | |
* | |
* Allows to check health of application launcher by querying /healthz route. | |
* | |
* @param launcher Server will track status of this launcher client. | |
*/ | |
class ClientHttp(val launcher: StreamingClient) extends Logging { | |
val port = 8081 | |
var boundPort: Option[Int] = None | |
var server: Option[Server] = None | |
val handlers = Seq[(String, Handler)]( | |
("/healthz", healthHandler) | |
) | |
/** /healthz route handler | |
* | |
* Reports health of the launcher and publishes information of the application | |
*/ | |
def healthHandler: Handler = { | |
new AbstractHandler { | |
override def handle(target: String, baseRequest: Request, request: HttpServletRequest, response: HttpServletResponse): Unit = { | |
response.setContentType("application/json") | |
if (!launcher.appIdOption.isDefined) { | |
response.setStatus(HttpServletResponse.SC_NOT_FOUND) | |
val res = Map( | |
"LauncherStatus" -> "Application Not Found" | |
) | |
baseRequest.setHandled(true) | |
response.getWriter.println(JacksMapper.writeValueAsString(res)) | |
return | |
} | |
val report = launcher.getApplicationReport(launcher.appIdOption.get) | |
response.setStatus(HttpServletResponse.SC_OK) | |
baseRequest.setHandled(true) | |
val res = Map( | |
"LauncherStatus" -> "Online", | |
"YarnCluster" -> launcher.yarnConf.getRaw("fs.defaultFS"), | |
"ApplicationId" -> launcher.appIdOption.get.toString, | |
"ApplicationStatus" -> report.getYarnApplicationState, | |
"StartedAt" -> report.getStartTime.toString, | |
"TrackingURL" -> report.getTrackingUrl.toString, | |
"ApplicationName" -> launcher.launcherArgs.appName | |
) | |
response.getWriter.println(JacksMapper.writeValueAsString(res)) | |
} | |
} | |
} | |
/** | |
* Attempts to start a Jetty server at the supplied ip:port which uses the supplied handlers. | |
* | |
* If the desired port number is contented, continues incrementing ports until a free port is | |
* found. Returns the chosen port and the jetty Server object. | |
*/ | |
def startJettyServer(ip: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = { | |
val handlersToRegister = handlers.map { case (path, handler) => | |
val contextHandler = new ContextHandler(path) | |
contextHandler.setAllowNullPathInfo(true) | |
contextHandler.setHandler(handler) | |
contextHandler | |
} | |
val handlerList = new HandlerList | |
handlerList.setHandlers(handlersToRegister.toArray) | |
@tailrec | |
def connect(currentPort: Int): (Server, Int) = { | |
val server = new Server(currentPort) | |
val pool = new QueuedThreadPool | |
pool.setDaemon(true) | |
server.setThreadPool(pool) | |
server.setHandler(handlerList) | |
Try { | |
server.start() | |
} match { | |
case s: Success[_] => | |
(server, server.getConnectors.head.getLocalPort) | |
case f: Failure[_] => | |
server.stop() | |
logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort)) | |
logInfo("Error was: " + f.toString) | |
connect((currentPort + 1) % 65536) | |
} | |
} | |
connect(port) | |
} | |
def bind() { | |
try { | |
val (srv, usedPort) = startJettyServer("0.0.0.0", port, handlers) | |
logInfo("Started Streaming Launcher UI at port %d".format(usedPort)) | |
server = Some(srv) | |
boundPort = Some(usedPort) | |
} catch { | |
case e: Exception => | |
logError("Failed to create Spark JettyUtils", e) | |
System.exit(1) | |
} | |
} | |
} | |
//=============================================== | |
// DATADOG SINK | |
//=============================================== | |
import com.codahale.metrics.MetricRegistry | |
// Requires com.clipperz.metrics-datadog artifact (not in Maven) | |
// Compile from https://github.com/clipperz/metrics-datadog | |
import com.codahale.metrics.reporting.{DatadogReporter, HttpTransport} | |
import java.util.concurrent.TimeUnit | |
import java.util.Properties | |
import org.apache.spark.metrics.sink.Sink | |
/** Sink to report metrics to Datadog */ | |
class DatadogSink(val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink { | |
val DD_KEY_PERIOD = "period" | |
val DD_DEFAULT_PERIOD = 10L | |
val DD_KEY_UNIT = "unit" | |
val DD_DEFAULT_UNIT = TimeUnit.SECONDS | |
val DD_API_KEY = "apikey" | |
val DD_KEY_HOST = "host" | |
val DD_DEFAULT_HOST = "" | |
def propertyToOption(prop: String) = Option(property.getProperty(prop)) | |
if (!propertyToOption(DD_API_KEY).isDefined) { | |
throw new Exception("Datadog sink requires 'apikey' property.") | |
} | |
val pollPeriod = propertyToOption(DD_KEY_PERIOD).map(_.toLong) | |
.getOrElse(DD_DEFAULT_PERIOD) | |
val pollUnit = propertyToOption(DD_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase)) | |
.getOrElse(DD_DEFAULT_UNIT) | |
val host = propertyToOption(DD_KEY_HOST).getOrElse(DD_DEFAULT_HOST) | |
val apikey = propertyToOption(DD_API_KEY).get | |
val transport = new HttpTransport("app.datadoghq.com",apikey) | |
val reporter = DatadogReporter.forRegistry(registry) | |
.convertRatesTo(TimeUnit.SECONDS) | |
.convertDurationsTo(TimeUnit.MILLISECONDS) | |
.build(transport,host) | |
override def start { | |
reporter.start(pollPeriod, pollUnit) | |
} | |
override def stop { | |
reporter.stop() | |
} | |
} |
I tried mimicking the custom metrics class and registered it but sadly it does not get written to Graphite sink
. This is structured streaming spark hence I am not sure whether there is a limitation on that , but I believe it shouldnt be a limitation . Attaching code snippets.
package com.hari.spark.kafka.struct.streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.Dataset
import org.apache.spark.Accumulator
import com.codahale.metrics._
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.types.StructType
object ReadWriteKafka {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.appName("TestGrafana").getOrCreate
import sparkSession.implicits._
val rowsPassed = sparkSession.sparkContext.accumulator[Long](0, "rowsPassed")
CustomRowsProcessedMetrics.regiserCustomMetrics("rowsPassed", rowsPassed)
val readFromKafka = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "10.65.137.104:9092").option("subscribe", "GrafanaSource").
option("enable.auto.commit", "false").option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer").option("failOnDataLoss", "false").option("auto.offset.reset", "latest").load
// perform same transformation
//implicit val employeeEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
val keyValueTuples = readFromKafka.selectExpr("CAST(key as STRING)", "CAST(value as STRING)").as[(String, String)]
val rowProcessedStore = countRowsProcessed(rowsPassed,keyValueTuples.filter(tup => tup._2.contains("ichigo")).map(tup => (tup._1, tup._2.toUpperCase)))
rowProcessedStore.printSchema
val writeToKafka = rowProcessedStore.writeStream.format("kafka").option("kafka.bootstrap.servers", "10.65.137.104:9092").
option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer").option("checkpointLocation", "/Hari/hdfs/staging1/").
option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer").option("topic", "GrafanaTarget").start
writeToKafka.awaitTermination
}
def countRowsProcessed(acc: Accumulator[Long], ds: Dataset[(String, String)]): Dataset[(String, String)] = {
implicit val employeeEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
ds.map {
x =>
acc += 1
x
}.toDF("key", "value").as[(String, String)]
}
}
import com.codahale.metrics.{ MetricRegistry, Meter, Gauge }
import org.apache.spark.Accumulator
import scala.collection.mutable.Map
import org.apache.spark.SparkEnv
import org.apache.spark.metrics.source.CustomMetrics
package com.hari.spark.kafka.struct.streaming {
object CustomRowsProcessedMetrics extends Serializable {
def regiserCustomMetrics(metricsName: String, acc: Accumulator[Long]) {
import org.apache.spark.metrics.source.CustomMetrics
val customMets = new CustomMetrics(metricsName, Map[String, Long](), Map[String, Meter]())
customMets.regGaugeAndMeter(acc, metricsName)
SparkEnv.get.metricsSystem.registerSource(customMets)
val test = SparkEnv.get.metricsSystem.getSourcesByName(metricsName)
println(test)
}
}
}
package org.apache.spark.metrics.source {
class CustomMetrics(metricsName: String, gauge1: Map[String, Long], metrics: Map[String, Meter]) extends Source {
def metricRegistry = new MetricRegistry()
def sourceName = metricsName
// update the metrics with time series
def regGaugeAndMeter(acc: Accumulator[Long], metricsName: String): Unit = {
import com.codahale.metrics.{ MetricRegistry, Meter, Gauge }
gauge1 += (metricsName -> 0L)
metrics += (metricsName -> metricRegistry.meter(metricsName+"-rate"))
metricRegistry.register(
MetricRegistry.name(metricsName),
new Gauge[Long] {
override def getValue: Long = {
metrics(metricsName).mark(acc.value - gauge1(metricsName))
gauge1(metricsName) = acc.value
println("The incremented values are ---> " +acc.value )
return acc.value
}
})
}
}
}
Sadly in 2016 this is still the best solution for user defined accumulator metrics.