Last active
June 14, 2022 14:10
-
-
Save TomLous/8c4c2b37e7ed44c3ef6300a65b1022f6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package functions.spark | |
import java.sql.Timestamp | |
import java.time.Instant | |
import java.util.UUID | |
case class CDFRecord[Out <: Product, In <: Convertable[Out]](value: In) { | |
def convert: Out = { | |
val ingestedAt = Timestamp.from(Instant.now()) | |
val ingestId = UUID.randomUUID().toString | |
value.convert(ingestedAt, ingestId) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package functions.spark | |
import java.sql.Timestamp | |
abstract class Convertable[T <: Product] extends Product{ | |
def convert(ingestedAt: Timestamp, ingestedId:String):T | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package functions.spark | |
import java.sql.Timestamp | |
case class InItem( | |
RAW_NAME: String, | |
RAW_VALUE: Int, | |
OPTIONAL_RAW_DOUBLE: Option[Double], | |
SOME_TIME: Timestamp | |
) extends Convertable [OutItem] { | |
override def convert(ingestedAt: Timestamp, ingestedId: String): OutItem = OutItem( | |
RAW_NAME, | |
RAW_VALUE, | |
OPTIONAL_RAW_DOUBLE.map(BigDecimal.valueOf).getOrElse(0.0), | |
SOME_TIME.getTime, | |
ingestedAt.getTime, | |
ingestedId | |
) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package functions.spark | |
case class OutItem( | |
name: String, | |
int_value: Int, | |
decimal: BigDecimal, | |
timestamp: Long, | |
ingested_at: Long, | |
ingest_id: String | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functions.spark.{InItem, OutItem} | |
import org.apache.spark.sql.{Encoders, SparkSession} | |
import org.apache.spark.sql.functions._ | |
import org.json4s._ | |
import org.json4s.jackson.Serialization.{read, write} | |
import scala.reflect.runtime.universe.TypeTag | |
import java.sql.Timestamp | |
import java.time.Instant | |
val spark: SparkSession = SparkSession | |
.builder() | |
.master("local[3]") | |
.appName("test3") | |
.config("spark.ui.showConsoleProgress", true) | |
.getOrCreate() | |
import spark.implicits._ | |
implicit val formats: Formats = DefaultFormats | |
val incomingStream = List( | |
InItem("A", 1, None, Timestamp.valueOf("2022-06-13 01:00:00")), | |
InItem("B", 2, Some(2.3333333333), Timestamp.valueOf("2022-06-13 02:00:00")), | |
InItem("C", 3, Some(33.0), Timestamp.valueOf("2022-06-13 03:00:00")) | |
).zipWithIndex | |
.map { case (in, idx) => | |
(null, write(in), "incoming", 0, idx, Timestamp.from(Instant.now())) | |
} | |
.toDF("key", "value", "topic", "partition", "offset", "timestamp") | |
// create | |
incomingStream.printSchema | |
incomingStream.show(false) | |
/* | |
root | |
|-- key: void (nullable = true) | |
|-- value: string (nullable = true) | |
|-- topic: string (nullable = true) | |
|-- partition: integer (nullable = false) | |
|-- offset: integer (nullable = false) | |
|-- timestamp: timestamp (nullable = true) | |
res0: Unit = () | |
+----+----------------------------------------------------------------------------------------------------+--------+---------+------+-----------------------+ | |
|key |value |topic |partition|offset|timestamp | | |
+----+----------------------------------------------------------------------------------------------------+--------+---------+------+-----------------------+ | |
|null|{"RAW_NAME":"A","RAW_VALUE":1,"SOME_TIME":"2022-06-12T23:00:00Z"} |incoming|0 |0 |2022-06-13 15:41:54.079| | |
|null|{"RAW_NAME":"B","RAW_VALUE":2,"OPTIONAL_RAW_DOUBLE":2.3333333333,"SOME_TIME":"2022-06-13T00:00:00Z"}|incoming|0 |1 |2022-06-13 15:41:54.081| | |
|null|{"RAW_NAME":"C","RAW_VALUE":3,"OPTIONAL_RAW_DOUBLE":33.0,"SOME_TIME":"2022-06-13T01:00:00Z"} |incoming|0 |2 |2022-06-13 15:41:54.082| | |
+----+----------------------------------------------------------------------------------------------------+--------+---------+------+-----------------------+ | |
*/ | |
class Transformer[Out <: Product : TypeTag, In <: Convertable[Out]: TypeTag] { | |
def transform(incoming:DataFrame):DataFrame = | |
incoming | |
.select( | |
from_json('value, Encoders.product[In].schema).as("value") | |
) | |
.as[CDFRecord[Out, In]] | |
.map(_.convert) | |
.toDF | |
} | |
val transformedStream = new Transformer[OutItem, InItem].transform(incomingStream) | |
transformedStream.printSchema | |
transformedStream.show(false) | |
/* | |
root | |
|-- name: string (nullable = true) | |
|-- int_value: integer (nullable = false) | |
|-- decimal: decimal(38,18) (nullable = true) | |
|-- timestamp: long (nullable = false) | |
|-- ingested_at: long (nullable = false) | |
|-- ingest_id: string (nullable = true) | |
res2: Unit = () | |
+----+---------+---------------------+-------------+-------------+------------------------------------+ | |
|name|int_value|decimal |timestamp |ingested_at |ingest_id | | |
+----+---------+---------------------+-------------+-------------+------------------------------------+ | |
|A |1 |0E-18 |1655074800000|1655213536030|1229a835-58e2-4e55-b94e-7443cff5cee4| | |
|B |2 |2.333333333300000000 |1655078400000|1655213536238|25faca42-b4f7-41c2-9936-30791c1c6139| | |
|C |3 |33.000000000000000000|1655082000000|1655213536238|c9abc8b7-2a65-445e-9574-328d557f700e| | |
+----+---------+---------------------+-------------+-------------+------------------------------------+ | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment