Skip to content

Instantly share code, notes, and snippets.

@globulon
Created January 16, 2018 09:20
Show Gist options
  • Select an option

  • Save globulon/056b1c6b9cc3f83e177531b39214f745 to your computer and use it in GitHub Desktop.

Select an option

Save globulon/056b1c6b9cc3f83e177531b39214f745 to your computer and use it in GitHub Desktop.
package nl.autotrack.data.batches.metrics
import java.time.format.DateTimeFormatter
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.persogroep.aws.kinesis.{AWSStreamProxy, RegionNameIdentification}
import nl.autotrack.data.metrics.raw.{DateFormats, Formats, MetricRequest, OwnerLenses, OwnerTarget, Setting}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import spray.json._
import scala.language.{higherKinds, implicitConversions, postfixOps}
import scalaz.{-\/, \/-}
object ProjectDealerMetrics extends ParseConfig
with RegionNameIdentification with Formats with OwnerLenses with DateFormats {
protected type MetricStat = (Int, MetricRequest)
implicit protected final def aggregateFormat: RootJsonFormat[AggregatedDealerView] = jsonFormat3(AggregatedDealerView.apply)
def main(args: Array[String]): Unit = {
readBatchConfig(args) match {
case \/-(cfg) ⇒ run(cfg)
case -\/(errs) ⇒ errs foreach { err ⇒
System.err.println(err)
System.exit(1)
}
}
}
private def run(cfg: BatchConfig) = {
val spark: SparkSession = SparkSession.builder().appName("ProjectDealerMetrics").getOrCreate()
import spark.implicits._
import org.apache.spark.sql.expressions.scalalang.typed
spark.readStream
.json("s3://autotrack.kinesis/autotrack-metrics-prod/2018/01/16/08/*")
.as[MetricRequest]
.filter { selectDealerMetric }
.groupByKey {
case m@MetricRequest(typ, t, _, Setting(src, chnl, _), OwnerTarget(o)) ⇒ (o.id, typ, src, chnl, t.toLocalDate.format(DateTimeFormatter.ISO_DATE))
}.agg(typed.count(_ ⇒ ()))
.writeStream.outputMode("append").format("parquet").option("path", "s3://autotrack.kinesis/autotrack-metrics-prod/2018/01/16/dealer_metric_aggregated_08_minutes")
.start()
}
private def selectDealerMetric: MetricRequest ⇒ Boolean = {
case m@MetricRequest(_, _, _, _, OwnerTarget(_)) ⇒ true
case _ ⇒ false
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment