Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created December 29, 2021 16:13
Show Gist options
  • Save nsivabalan/7d6ea90ebfa76f9a53abedfa562562b7 to your computer and use it in GitHub Desktop.
Save nsivabalan/7d6ea90ebfa76f9a53abedfa562562b7 to your computer and use it in GitHub Desktop.
./bin/spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.4,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --driver-memory 8g --executor-memory 9g --jars ~/Documents/personal/projects/nov26/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.10.0-SNAPSHOT.jar --conf spark.driver.extraJavaOptions="-Dlog4j.configuration=file:/Users/nsb/Documents/personal/tools/log4j/debug_hudi_log4j.properties" --conf spark.executor.extraJavaOptions="-Dlog4j.configuration=file:/Users/nsb/Documents/personal/tools/log4j/debug_hudi_log4j.properties"
// Define kafka flow
val dataStreamReader = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("subscribe", "impressions").
option("startingOffsets", "earliest").
option("maxOffsetsPerTrigger", 5000).
option("failOnDataLoss", false)
val df = dataStreamReader.load().
selectExpr(
"topic as kafka_topic",
"CAST(partition AS STRING) kafka_partition",
"cast(timestamp as String) kafka_timestamp",
"CAST(offset AS STRING) kafka_offset",
"CAST(key AS STRING) kafka_key",
"CAST(value AS STRING) kafka_value",
"current_timestamp() current_time").
selectExpr(
"kafka_topic",
"concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
"kafka_offset",
"kafka_timestamp",
"kafka_key",
"kafka_value",
"substr(current_time,1,10) partition_date")
import java.time.LocalDateTime
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;
val writer = df.
writeStream.format("org.apache.hudi").
option(TABLE_TYPE.key, "COPY_ON_WRITE").
option(PRECOMBINE_FIELD.key, "kafka_timestamp").
option(RECORDKEY_FIELD.key, "kafka_partition_offset").
option(PARTITIONPATH_FIELD.key, "partition_date").
option(HIVE_SYNC_ENABLED.key, false).
option(HIVE_STYLE_PARTITIONING.key, true).
option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
option(STREAMING_IGNORE_FAILED_BATCH.key, false).
option(STREAMING_RETRY_CNT.key, 0).
option("hoodie.table.name", "copy_on_write_table").
option("hoodie.clustering.plan.strategy.target.file.max.bytes","10485760").
option("hoodie.clustering.plan.strategy.small.file.limit","50000000").
option("hoodie.clustering.plan.strategy.sort.columns","partition_date,kafka_partition_offset").
option("hoodie.clustering.async.max.commits","2").
option("hoodie.datasource.clustering.async.enable","true").
option("hoodie.clustering.async.enabled","true").
option("hoodie.clustering.updates.strategy","org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy").
option("checkpointLocation", "/tmp/hudi_streaming_kafka/checkpoint/").
outputMode(OutputMode.Append());
writer.trigger(new ProcessingTime(60000)).start("/tmp/hudi_streaming_kafka/COPY_ON_WRITE");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment