Last active
December 13, 2018 17:05
-
-
Save anthavio/a7a856654fbdf3471a1588c008d111d3 to your computer and use it in GitHub Desktop.
Standalone Spark cluster http://master:8080/json/ client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zx.spark | |
import java.time.{Instant, LocalDateTime, ZoneId} | |
import java.util.concurrent.TimeUnit | |
import cats.effect.Sync | |
import cats.implicits._ | |
import com.fasterxml.jackson.annotation.JsonFormat | |
import com.fasterxml.jackson.core.{JsonParser, JsonToken} | |
import com.fasterxml.jackson.databind.annotation.JsonDeserialize | |
import com.fasterxml.jackson.databind.deser.std.StdDeserializer | |
import com.fasterxml.jackson.databind.module.SimpleModule | |
import com.fasterxml.jackson.databind.{DeserializationContext, DeserializationFeature, JsonDeserializer, ObjectMapper} | |
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule | |
import com.fasterxml.jackson.module.scala.DefaultScalaModule | |
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper | |
import com.softwaremill.sttp._ | |
import scala.util.Try | |
import com.typesafe.scalalogging.StrictLogging | |
import scala.concurrent.duration.{Duration, FiniteDuration} | |
/** | |
* Standalone Spark cluster http://master:8080/json/ | |
* Not available for Mesos, Yarn, ... clusters | |
*/ | |
trait SparkSaAPI[F[_]] { | |
def fetch(): F[SaApiResponse] | |
} | |
object SparkSaAPI { | |
def apply[F[_]: Sync]( | |
endpointUrl: String, | |
connTimeout: Duration = Duration.apply(3, TimeUnit.SECONDS), | |
readTimeout: Duration = Duration.apply(15, TimeUnit.SECONDS)): SparkSaAPI[F] = | |
new DefaultSparkSaAPI[F](endpointUrl, connTimeout, readTimeout) | |
} | |
/** | |
* Ignoring fields workers, activedrivers, completedapps as we do not need them | |
*/ | |
case class SaApiResponse( | |
url: String, | |
cores: Int, | |
coresUsed: Int, | |
memory: Int, | |
memoryused: Int, | |
activeapps: List[SaApiSparkApp]) | |
sealed trait SaApiAppState | |
sealed trait ActiveSaAppState extends SaApiAppState { | |
val isActive = true | |
} | |
sealed trait InactiveSaAppState extends SaApiAppState { | |
val isActive = false | |
} | |
object SaApiAppState { | |
case object RUNNING extends ActiveSaAppState | |
case object WAITING extends ActiveSaAppState | |
case object FINISHED extends InactiveSaAppState | |
case object KILLED extends InactiveSaAppState | |
def fromString(stateStr: String): Option[SaApiAppState] = stateStr match { | |
case "RUNNING" => Some(RUNNING) | |
case "WAITING" => Some(WAITING) | |
case "FINISHED" => Some(FINISHED) | |
case "KILLED" => Some(KILLED) | |
case _ => None | |
} | |
} | |
case class SaApiSparkApp( | |
id: String, | |
name: String, | |
cores: Int, | |
user: String, | |
memoryperslave: Int, | |
state: SaApiAppState, | |
@JsonDeserialize(using = classOf[MillisecondDeserializer]) | |
starttime: LocalDateTime, // 1544355643695 | |
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "EEE MMM d HH:mm:ss z yyyy") | |
submitdate: LocalDateTime, // "Sun Dec 09 12:40:43 CET 2018" | |
duration: java.time.Duration) | |
class DefaultSparkSaAPI[F[_]: Sync](endpointUrl: String, connTimeout: Duration, readTimeout: Duration) | |
extends SparkSaAPI[F] with StrictLogging { | |
private val jackson = new ObjectMapper() with ScalaObjectMapper | |
jackson.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) | |
jackson.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false) // DurationDeserializer | |
jackson | |
.registerModule(DefaultScalaModule) // Beware https://issues.apache.org/jira/browse/SPARK-22128 | |
.registerModule(new JavaTimeModule()) | |
val module = new SimpleModule() | |
module.addDeserializer(classOf[SaApiAppState], new SaApiAppStateDeserializer()) | |
jackson.registerModule(module) | |
class SaApiAppStateDeserializer extends StdDeserializer[SaApiAppState](classOf[SaApiAppState]) { | |
def deserialize(parser: JsonParser, context: DeserializationContext): SaApiAppState = | |
SaApiAppState | |
.fromString(parser.getValueAsString) | |
.getOrElse(throw new IllegalArgumentException(s"${parser.getValueAsString} is not a valid state value")) | |
} | |
private implicit val backend: SttpBackend[Id, Nothing] = HttpURLConnectionBackend( | |
options = SttpBackendOptions.connectionTimeout(FiniteDuration(connTimeout.toMillis, TimeUnit.MILLISECONDS))) | |
def parseJson[B: Manifest](content: String): Either[Throwable, B] = Either.fromTry(Try(jackson.readValue[B](content))) | |
def asJson[B: Manifest]: ResponseAs[Either[Throwable, B], Nothing] = asString.map(content => parseJson[B](content)) | |
def fetch(): F[SaApiResponse] = { | |
def extract(response: Response[Either[Throwable, SaApiResponse]]): F[SaApiResponse] = { | |
response.body match { | |
case Left(content) => | |
Sync[F].raiseError( | |
new RuntimeException( | |
s"Unexpected ${response.code} ${response.statusText} response from $endpointUrl :\n$content")) | |
case Right(parsed) => Sync[F].fromEither(parsed) | |
} | |
} | |
Sync[F].defer { | |
val request = | |
sttp.get(uri"$endpointUrl").readTimeout(readTimeout).response(asJson[SaApiResponse]) | |
logger.debug(s"Requesting ${request.method} ${request.uri}") | |
request | |
.send() | |
.flatMap(response => extract(response)) | |
} | |
} | |
} | |
class MillisecondDeserializer(val zoneId: ZoneId) extends JsonDeserializer[LocalDateTime] { | |
def this() = this(ZoneId.of("UTC")) | |
override def deserialize(parser: JsonParser, ctxt: DeserializationContext): LocalDateTime = { | |
val token = parser.getCurrentToken | |
token match { | |
case JsonToken.VALUE_STRING => toDate(java.lang.Long.parseLong(parser.getText.trim)) | |
case JsonToken.VALUE_NUMBER_INT => toDate(parser.getLongValue) | |
case _ => throw ctxt.wrongTokenException(parser, JsonToken.VALUE_STRING, "Expected a string or numeric value") | |
} | |
} | |
private def toDate(millis: Long) = Instant.ofEpochMilli(millis).atZone(zoneId).toLocalDateTime | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment