Created
November 24, 2022 07:30
-
-
Save tecmaverick/0a8590fce56b41fe60d1c411ce07f207 to your computer and use it in GitHub Desktop.
Spark Kafka Processing with NULL record validation and replacement
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.ajp.kafkademo | |
import org.apache.log4j.Logger | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.streaming.Trigger | |
import org.apache.spark.sql.types._ | |
object kafkademo extends Serializable{ | |
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName) | |
def main(args: Array[String]): Unit = { | |
val schema = StructType(List( | |
StructField("fullname", StringType), | |
StructField("sex", StringType), | |
StructField("address", StringType), | |
StructField("personal", StructType(List( | |
StructField("dob", StringType), | |
StructField("age", StringType) | |
))), | |
)) | |
val spark = SparkSession.builder() | |
.master("local[3]") | |
.appName("Streaming Files") | |
.config("spark.streaming.stopGracefullyOnShutdown", "true") | |
.config("spark.sql.streaming.schemaInference", "true") | |
.config("spark.sql.shuffle.partitions", 3) | |
.config("spark.cleaner.referenceTracking.cleanCheckpoints", "true") | |
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") | |
.getOrCreate() | |
// maxFilesPerTrigger - maximum number of new files to be considered in every trigger | |
val rawDF = spark.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "localhost:9092") | |
.option("subscribe", "test2") | |
.option("startingOffsets", "earliest") | |
.load() | |
// println(rawDF.printSchema()) | |
val newDF = rawDF.select(from_json(col("value").cast(to="string"),schema = schema).alias("value")) | |
val json_flattened = newDF.selectExpr( | |
"(case when value.fullname is null then 'XXXX' else value.fullname end) as fullname", | |
"(case when value.sex is null then 'XXXX' else value.sex end) as sex", | |
"(case when value.address is null then 'XXXX' else value.address end) as address", | |
"(case when to_date(value.personal.dob, 'yyyy/MM/dd') is null then '1900-01-01' else to_date(value.personal.dob, 'yyyy/MM/dd') end) as dob", | |
"(case when value.personal.age is null then 0 else cast(value.personal.age as int) end) as age") | |
val invoiceWriterQuery = json_flattened.writeStream | |
.format("json") | |
.queryName("Flattened Invoice Writer") | |
.outputMode("append") | |
.option("path", "output") | |
.option("checkpointLocation", "chk-point-dir") | |
.trigger(Trigger.ProcessingTime("1 minute")) | |
.start() | |
.awaitTermination() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment