Skip to content

Instantly share code, notes, and snippets.

@tecmaverick
Created November 24, 2022 07:30
Show Gist options
  • Save tecmaverick/0a8590fce56b41fe60d1c411ce07f207 to your computer and use it in GitHub Desktop.
Save tecmaverick/0a8590fce56b41fe60d1c411ce07f207 to your computer and use it in GitHub Desktop.
Spark Kafka Processing with NULL record validation and replacement
package org.ajp.kafkademo
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._
object kafkademo extends Serializable{
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
def main(args: Array[String]): Unit = {
val schema = StructType(List(
StructField("fullname", StringType),
StructField("sex", StringType),
StructField("address", StringType),
StructField("personal", StructType(List(
StructField("dob", StringType),
StructField("age", StringType)
))),
))
val spark = SparkSession.builder()
.master("local[3]")
.appName("Streaming Files")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.streaming.schemaInference", "true")
.config("spark.sql.shuffle.partitions", 3)
.config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
.config("spark.sql.legacy.timeParserPolicy", "LEGACY")
.getOrCreate()
// maxFilesPerTrigger - maximum number of new files to be considered in every trigger
val rawDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test2")
.option("startingOffsets", "earliest")
.load()
// println(rawDF.printSchema())
val newDF = rawDF.select(from_json(col("value").cast(to="string"),schema = schema).alias("value"))
val json_flattened = newDF.selectExpr(
"(case when value.fullname is null then 'XXXX' else value.fullname end) as fullname",
"(case when value.sex is null then 'XXXX' else value.sex end) as sex",
"(case when value.address is null then 'XXXX' else value.address end) as address",
"(case when to_date(value.personal.dob, 'yyyy/MM/dd') is null then '1900-01-01' else to_date(value.personal.dob, 'yyyy/MM/dd') end) as dob",
"(case when value.personal.age is null then 0 else cast(value.personal.age as int) end) as age")
val invoiceWriterQuery = json_flattened.writeStream
.format("json")
.queryName("Flattened Invoice Writer")
.outputMode("append")
.option("path", "output")
.option("checkpointLocation", "chk-point-dir")
.trigger(Trigger.ProcessingTime("1 minute"))
.start()
.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment