Skip to content

Instantly share code, notes, and snippets.

@rupeshtr78
Last active November 4, 2020 01:30
Show Gist options
  • Save rupeshtr78/6216a9bf3eb55c5a738d6ec4bce8ec0d to your computer and use it in GitHub Desktop.
Save rupeshtr78/6216a9bf3eb55c5a738d6ec4bce8ec0d to your computer and use it in GitHub Desktop.
spark.readStream
.format("kafka")
.option("subscribe", "input")
.load()
.withWatermark(eventTime="timestampColumn", delayThreshold="10 seconds")
.groupBy('value.cast("string") as 'key)
.groupBy(window(col("time"), "1 day").as("time")) // tumbling window
.groupBy(window(col("time"), "10 minutes", "5 minutes").as("time")) // sliding window
.groupBy( 'keyCol, window("timestamp","10 mins"))
.agg(count("*") as 'value)
.writeStream
.format("kafka")
.option("topic", "output")
.trigger("1 minute")
.outputMode("update")
.option("checkpointLocation", "/cp/")
.start()
.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment