-
-
Save skp33/7fb302c4b26a4b4d495115cc5c15a788 to your computer and use it in GitHub Desktop.
Spark productionizing utilities developed by Ooyala, shown in Spark Summit 2014
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//================================================================== | |
// SPARK INSTRUMENTATION | |
//================================================================== | |
import com.codahale.metrics.{MetricRegistry, Meter, Gauge} | |
import org.apache.spark.{SparkEnv, Accumulator} | |
import org.apache.spark.metrics.source.Source | |
import org.joda.time.DateTime | |
import scala.collection.mutable | |
/** Instrumentation for Spark based on accumulators. | |
* | |
* Usage: | |
* val instrumentation = new SparkInstrumentation("example.metrics") | |
* val numReqs = sc.accumulator(0L) | |
* instrumentation.source.registerDailyAccumulator(numReqs, "numReqs") | |
* instrumentation.register() | |
* | |
* Will create and report the following metrics: | |
* - Gauge with total number of requests (daily) | |
* - Meter with rate of requests | |
* | |
* @param prefix prefix for all metrics that will be reported by this Instrumentation | |
*/ | |
class SparkInstrumentation(prefix: String) extends Serializable { | |
val accumulators = mutable.Set[Accumulator[Long]]() | |
private class InstrumentationSource(prefix: String) extends Source { | |
val metricRegistry = new MetricRegistry | |
val sourceName = prefix | |
val oldgauges = mutable.Map[String,Long]() | |
val oldtimes = mutable.Map[String, DateTime]() | |
val meters = mutable.Map[String,Meter]() | |
/** Computes metrics based on accumulator. Gauge never resets. | |
* | |
* @param a Metrics will be derived from this accumulator | |
* @param name Name of the metrics | |
*/ | |
def registerAccumulator(a: Accumulator[Long], name: String){ | |
oldgauges += (name -> 0L) | |
meters += (name -> metricRegistry.meter(name + "-rate")) | |
metricRegistry.register(MetricRegistry.name(name), | |
new Gauge[Long] { | |
override def getValue: Long = { | |
meters(name).mark(a.value - oldgauges(name)) | |
oldgauges(name) = a.value | |
return a.value | |
} | |
}) | |
} | |
/** Computes metrics based on accumulator. Gauge resets at the end of the day. | |
* | |
* @param a Metrics will be derived from this accumulator | |
* @param name Name of the metrics | |
*/ | |
def registerDailyAccumulator(a: Accumulator[Long], name: String){ | |
oldgauges += (name -> 0L) | |
meters += (name -> metricRegistry.meter(name + "-rate")) | |
oldtimes += (name -> DateTime.now) | |
metricRegistry.register(MetricRegistry.name(name), | |
new Gauge[Long] { | |
override def getValue: Long = { | |
meters(name).mark(a.value - oldgauges(name)) | |
val now = DateTime.now | |
if (now.getDayOfMonth != oldtimes(name).getDayOfMonth){ | |
a.setValue(0L) | |
} | |
oldtimes(name) = now | |
oldgauges(name) = a.value | |
return a.value | |
} | |
}) | |
} | |
} | |
val source = new InstrumentationSource(prefix) | |
/** Register the Instrumentation with Spark so the metrics are reported to any provided Sink. */ | |
def register(){ | |
SparkEnv.get.metricsSystem.registerSource(source) | |
} | |
} | |
//============================================ | |
// STREAMING LAUNCHER / SERVER | |
//============================================ | |
import scalax.io.Resource | |
import org.apache.spark.deploy.yarn.{Client, ClientArguments} | |
import org.apache.spark.{Logging, SparkConf} | |
import org.apache.hadoop.yarn.api.records.{ApplicationReport, YarnApplicationState, ApplicationId} | |
import org.apache.hadoop.yarn.client.ClientRMProxy | |
import org.apache.hadoop.yarn.api.ApplicationClientProtocol | |
import org.apache.hadoop.yarn.api.protocolrecords.{GetApplicationsResponse, GetApplicationsRequest} | |
import org.eclipse.jetty.server.{Request, Handler, Server} | |
import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler, AbstractHandler} | |
import javax.servlet.http.{HttpServletResponse, HttpServletRequest} | |
import com.lambdaworks.jacks.JacksMapper | |
import scala.annotation.tailrec | |
import org.eclipse.jetty.util.thread.QueuedThreadPool | |
import scala.util.{Failure, Success, Try} | |
/** Local launcher client for streaming applications in YARN. | |
* | |
* Extends usual Spark YARN client for streaming applications. This class should not be called by the user, | |
* instead, the StreamingClient object is the entry point for launching applications. | |
* | |
* @param args User supplied arguments | |
* @param sparkConf Spark Configuration | |
*/ | |
class StreamingClient(args: ClientArguments, sparkConf: SparkConf) extends Client(args, sparkConf) { | |
import scala.collection.JavaConversions._ | |
import scala.collection.JavaConverters._ | |
var appIdOption: Option[ApplicationId] = None | |
val clientHttp = new ClientHttp(this) | |
val launcherArgs = args | |
/** Connects to or launches application in YARN cluster. | |
* | |
* 1. Search for existing application by the same name. | |
* 2. If found, monitor existing application. | |
* 3. If not found, launch new application with this name, and monitor. | |
*/ | |
override def run() { | |
sparkConf.set("spark.yarn.report.interval", "10000") | |
clientHttp.bind() | |
println("Using yarn at " + yarnConf.getRaw("fs.defaultFS")) | |
val pidFile = System.getenv("SPARK_LAUNCHER_PID_DIR") match { | |
case "" => None | |
case x => Some(Resource.fromFile("%s/%s.appInfo".format(x,args.appName))) | |
} | |
val instances = getSparkApplications(setAsJavaSet(Set("SPARK")),java.util.EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.ACCEPTED, YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED)).flatMap { report => | |
if (report.getName == args.appName){ | |
Some(report.getApplicationId) | |
} else { | |
None | |
} | |
}.toList | |
if (instances.size > 0){ | |
println("Application already running. Monitoring old application.") | |
init(yarnConf) | |
start() | |
appIdOption = Some(instances.head) | |
monitorApplication(appIdOption.get) | |
System.exit(0) | |
} else{ | |
//super.stop() | |
println("Application not found.") | |
appIdOption = Some(runApp()) | |
if (!appIdOption.isDefined){ | |
println("Application didn't start correctly") | |
System.exit(1) | |
} | |
if (pidFile.isDefined) { | |
pidFile.get.write("%s %s".format(appIdOption.get.toString, args.userArgs.mkString(" "))) | |
System.exit(0) | |
} | |
monitorApplication(appIdOption.get) | |
System.exit(0) | |
} | |
} | |
/** Gets list of Spark applications in YARN cluster */ | |
def getSparkApplications(applicationTypes: java.util.Set[String], applicationStates: java.util.EnumSet[YarnApplicationState]): java.util.List[ApplicationReport] = { | |
setConfig(yarnConf) | |
val rmClient = ClientRMProxy.createRMProxy(getConfig, classOf[ApplicationClientProtocol]) | |
val request: GetApplicationsRequest = GetApplicationsRequest.newInstance(applicationTypes, applicationStates) | |
val response: GetApplicationsResponse = rmClient.getApplications(request) | |
return response.getApplicationList | |
} | |
} | |
/** Local launcher client for streaming applications in YARN. | |
* | |
* Usage: | |
* java -cp /etc/hadoop/conf:AppJar.jar:spark-assembly.jar org.apache.spark.yarn.StreamingClient --jar AppJar.jar | |
* --addJars /jars/config.jar --class ooyala.app.MainClass --arg arg1 --arg arg2 --name MyApp | |
* | |
*/ | |
object StreamingClient { | |
def main(argStrings: Array[String]) { | |
// Set an env variable indicating we are running in YARN mode. | |
// Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes - | |
// see Client#setupLaunchEnv(). | |
System.setProperty("SPARK_YARN_MODE", "true") | |
val sparkConf = new SparkConf() | |
val args = new ClientArguments(argStrings, sparkConf) | |
new StreamingClient(args, sparkConf).run() | |
} | |
} | |
/** Starts an HTTP server for the launcher client. | |
* | |
* Allows to check health of application launcher by querying /healthz route. | |
* | |
* @param launcher Server will track status of this launcher client. | |
*/ | |
class ClientHttp(val launcher: StreamingClient) extends Logging { | |
val port = 8081 | |
var boundPort: Option[Int] = None | |
var server: Option[Server] = None | |
val handlers = Seq[(String, Handler)]( | |
("/healthz", healthHandler) | |
) | |
/** /healthz route handler | |
* | |
* Reports health of the launcher and publishes information of the application | |
*/ | |
def healthHandler: Handler = { | |
new AbstractHandler { | |
override def handle(target: String, baseRequest: Request, request: HttpServletRequest, response: HttpServletResponse): Unit = { | |
response.setContentType("application/json") | |
if (!launcher.appIdOption.isDefined) { | |
response.setStatus(HttpServletResponse.SC_NOT_FOUND) | |
val res = Map( | |
"LauncherStatus" -> "Application Not Found" | |
) | |
baseRequest.setHandled(true) | |
response.getWriter.println(JacksMapper.writeValueAsString(res)) | |
return | |
} | |
val report = launcher.getApplicationReport(launcher.appIdOption.get) | |
response.setStatus(HttpServletResponse.SC_OK) | |
baseRequest.setHandled(true) | |
val res = Map( | |
"LauncherStatus" -> "Online", | |
"YarnCluster" -> launcher.yarnConf.getRaw("fs.defaultFS"), | |
"ApplicationId" -> launcher.appIdOption.get.toString, | |
"ApplicationStatus" -> report.getYarnApplicationState, | |
"StartedAt" -> report.getStartTime.toString, | |
"TrackingURL" -> report.getTrackingUrl.toString, | |
"ApplicationName" -> launcher.launcherArgs.appName | |
) | |
response.getWriter.println(JacksMapper.writeValueAsString(res)) | |
} | |
} | |
} | |
/** | |
* Attempts to start a Jetty server at the supplied ip:port which uses the supplied handlers. | |
* | |
* If the desired port number is contented, continues incrementing ports until a free port is | |
* found. Returns the chosen port and the jetty Server object. | |
*/ | |
def startJettyServer(ip: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = { | |
val handlersToRegister = handlers.map { case (path, handler) => | |
val contextHandler = new ContextHandler(path) | |
contextHandler.setAllowNullPathInfo(true) | |
contextHandler.setHandler(handler) | |
contextHandler | |
} | |
val handlerList = new HandlerList | |
handlerList.setHandlers(handlersToRegister.toArray) | |
@tailrec | |
def connect(currentPort: Int): (Server, Int) = { | |
val server = new Server(currentPort) | |
val pool = new QueuedThreadPool | |
pool.setDaemon(true) | |
server.setThreadPool(pool) | |
server.setHandler(handlerList) | |
Try { | |
server.start() | |
} match { | |
case s: Success[_] => | |
(server, server.getConnectors.head.getLocalPort) | |
case f: Failure[_] => | |
server.stop() | |
logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort)) | |
logInfo("Error was: " + f.toString) | |
connect((currentPort + 1) % 65536) | |
} | |
} | |
connect(port) | |
} | |
def bind() { | |
try { | |
val (srv, usedPort) = startJettyServer("0.0.0.0", port, handlers) | |
logInfo("Started Streaming Launcher UI at port %d".format(usedPort)) | |
server = Some(srv) | |
boundPort = Some(usedPort) | |
} catch { | |
case e: Exception => | |
logError("Failed to create Spark JettyUtils", e) | |
System.exit(1) | |
} | |
} | |
} | |
//=============================================== | |
// DATADOG SINK | |
//=============================================== | |
import com.codahale.metrics.MetricRegistry | |
// Requires com.clipperz.metrics-datadog artifact (not in Maven) | |
// Compile from https://github.com/clipperz/metrics-datadog | |
import com.codahale.metrics.reporting.{DatadogReporter, HttpTransport} | |
import java.util.concurrent.TimeUnit | |
import java.util.Properties | |
import org.apache.spark.metrics.sink.Sink | |
/** Sink to report metrics to Datadog */ | |
class DatadogSink(val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink { | |
val DD_KEY_PERIOD = "period" | |
val DD_DEFAULT_PERIOD = 10L | |
val DD_KEY_UNIT = "unit" | |
val DD_DEFAULT_UNIT = TimeUnit.SECONDS | |
val DD_API_KEY = "apikey" | |
val DD_KEY_HOST = "host" | |
val DD_DEFAULT_HOST = "" | |
def propertyToOption(prop: String) = Option(property.getProperty(prop)) | |
if (!propertyToOption(DD_API_KEY).isDefined) { | |
throw new Exception("Datadog sink requires 'apikey' property.") | |
} | |
val pollPeriod = propertyToOption(DD_KEY_PERIOD).map(_.toLong) | |
.getOrElse(DD_DEFAULT_PERIOD) | |
val pollUnit = propertyToOption(DD_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase)) | |
.getOrElse(DD_DEFAULT_UNIT) | |
val host = propertyToOption(DD_KEY_HOST).getOrElse(DD_DEFAULT_HOST) | |
val apikey = propertyToOption(DD_API_KEY).get | |
val transport = new HttpTransport("app.datadoghq.com",apikey) | |
val reporter = DatadogReporter.forRegistry(registry) | |
.convertRatesTo(TimeUnit.SECONDS) | |
.convertDurationsTo(TimeUnit.MILLISECONDS) | |
.build(transport,host) | |
override def start { | |
reporter.start(pollPeriod, pollUnit) | |
} | |
override def stop { | |
reporter.stop() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment