Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created January 2, 2022 22:16
Show Gist options
  • Save nsivabalan/f7ee7fa611cfc864db7506c016a73787 to your computer and use it in GitHub Desktop.
Save nsivabalan/f7ee7fa611cfc864db7506c016a73787 to your computer and use it in GitHub Desktop.
./bin/spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.4,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --driver-memory 8g --executor-memory 9g --jars ~/Documents/personal/projects/nov26/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.10.0-SNAPSHOT.jar --conf spark.driver.extraJavaOptions="-Dlog4j.configuration=file:/Users/nsb/Documents/personal/tools/log4j/debug_hudi_log4j.properties" --conf spark.executor.extraJavaOptions="-Dlog4j.configuration=file:/Users/nsb/Documents/personal/tools/log4j/debug_hudi_log4j.properties"
// Define kafka flow
val dataStreamReader = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("subscribe", "impressions").
option("startingOffsets", "earliest").
option("maxOffsetsPerTrigger", 5000).
option("failOnDataLoss", false)
val df = dataStreamReader.load().
selectExpr(
"topic as kafka_topic",
"CAST(partition AS STRING) kafka_partition",
"cast(timestamp as String) kafka_timestamp",
"CAST(offset AS STRING) kafka_offset",
"CAST(key AS STRING) kafka_key",
"CAST(value AS STRING) kafka_value",
"current_timestamp() current_time").
selectExpr(
"kafka_topic",
"concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
"kafka_offset",
"kafka_timestamp",
"kafka_key",
"kafka_value",
"substr(current_time,1,10) partition_date")
import java.time.LocalDateTime
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;
// Create and start query
val query = df
.writeStream
.queryName("demo")
.foreachBatch { (batchDF: DataFrame, _: Long) => {
batchDF.persist()
println(LocalDateTime.now() + " start writing cow table")
batchDF.write.format("org.apache.hudi")
.option(TABLE_TYPE.key, "COPY_ON_WRITE")
.option(PRECOMBINE_FIELD.key, "kafka_timestamp")
// Use kafka partition and offset as combined primary key
.option(RECORDKEY_FIELD.key, "kafka_partition_offset")
// Partition with current date
.option(PARTITIONPATH_FIELD.key, "partition_date")
.option(TABLE_NAME.key, "copy_on_write_table")
.option(HIVE_SYNC_ENABLED.key, false)
.option(HIVE_STYLE_PARTITIONING.key, true)
.option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false)
.option(STREAMING_IGNORE_FAILED_BATCH.key, false)
.option(STREAMING_RETRY_CNT.key, 0)
.option("hoodie.table.name", "copy_on_write_table")
.mode(SaveMode.Append)
.save("/tmp/hudi_streaming_kafka/COPY_ON_WRITE")
println(LocalDateTime.now() + " finish")
batchDF.unpersist()
}
}
.option("checkpointLocation", "/tmp/hudi_streaming_kafka/checkpoint/")
.start()
query.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment