Created
January 16, 2018 09:20
-
-
Save globulon/056b1c6b9cc3f83e177531b39214f745 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package nl.autotrack.data.batches.metrics | |
| import java.time.format.DateTimeFormatter | |
| import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration | |
| import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder | |
| import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream | |
| import com.persogroep.aws.kinesis.{AWSStreamProxy, RegionNameIdentification} | |
| import nl.autotrack.data.metrics.raw.{DateFormats, Formats, MetricRequest, OwnerLenses, OwnerTarget, Setting} | |
| import org.apache.spark.SparkConf | |
| import org.apache.spark.sql.SparkSession | |
| import org.apache.spark.storage.StorageLevel | |
| import org.apache.spark.streaming._ | |
| import org.apache.spark.streaming.kinesis.KinesisInputDStream | |
| import spray.json._ | |
| import scala.language.{higherKinds, implicitConversions, postfixOps} | |
| import scalaz.{-\/, \/-} | |
| object ProjectDealerMetrics extends ParseConfig | |
| with RegionNameIdentification with Formats with OwnerLenses with DateFormats { | |
| protected type MetricStat = (Int, MetricRequest) | |
| implicit protected final def aggregateFormat: RootJsonFormat[AggregatedDealerView] = jsonFormat3(AggregatedDealerView.apply) | |
| def main(args: Array[String]): Unit = { | |
| readBatchConfig(args) match { | |
| case \/-(cfg) ⇒ run(cfg) | |
| case -\/(errs) ⇒ errs foreach { err ⇒ | |
| System.err.println(err) | |
| System.exit(1) | |
| } | |
| } | |
| } | |
| private def run(cfg: BatchConfig) = { | |
| val spark: SparkSession = SparkSession.builder().appName("ProjectDealerMetrics").getOrCreate() | |
| import spark.implicits._ | |
| import org.apache.spark.sql.expressions.scalalang.typed | |
| spark.readStream | |
| .json("s3://autotrack.kinesis/autotrack-metrics-prod/2018/01/16/08/*") | |
| .as[MetricRequest] | |
| .filter { selectDealerMetric } | |
| .groupByKey { | |
| case m@MetricRequest(typ, t, _, Setting(src, chnl, _), OwnerTarget(o)) ⇒ (o.id, typ, src, chnl, t.toLocalDate.format(DateTimeFormatter.ISO_DATE)) | |
| }.agg(typed.count(_ ⇒ ())) | |
| .writeStream.outputMode("append").format("parquet").option("path", "s3://autotrack.kinesis/autotrack-metrics-prod/2018/01/16/dealer_metric_aggregated_08_minutes") | |
| .start() | |
| } | |
| private def selectDealerMetric: MetricRequest ⇒ Boolean = { | |
| case m@MetricRequest(_, _, _, _, OwnerTarget(_)) ⇒ true | |
| case _ ⇒ false | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment