Last active
January 24, 2018 16:28
-
-
Save jlafall/e2c0d88428ebf7d9e6cc6b53818b570d to your computer and use it in GitHub Desktop.
Apache Spark Structured Streaming word count using Kafka as the source and Couchbase and the sink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions._ | |
object WordCountKafkaCouchbase { | |
def main(args: Array[String]) { | |
// create spark session with settings | |
val spark = SparkSession | |
.builder | |
.appName("Word Count Test") | |
.config("spark.couchbase.username", "[username goes here]") | |
.config("spark.couchbase.password", "[password goes here]") | |
.config("spark.couchbase.nodes", "localhost") | |
.config("spark.couchbase.bucket.words", "") | |
.getOrCreate() | |
import spark.implicits._ | |
// create kafka streaming data frame | |
val kafkaDS = spark | |
.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "kafka:9092") | |
.option("subscribe", "words") | |
.option("failOnDataLoss", "false") | |
.load() | |
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") | |
.as[(String, String)] | |
val wordDF = kafkaDS.flatMap { | |
case (_, value) => value.split("""\s+""") | |
} | |
.toDF("word") | |
val wordCountDF = wordDF.groupBy("word") | |
.agg(count("word").as("count")) | |
val query = wordCountDF.writeStream | |
.format("com.couchbase.spark.sql") | |
.outputMode("update") | |
.option("checkpointLocation", "/home/vagrant/checkpoint") | |
.option("idField", "word") | |
.start() | |
.awaitTermination() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.time.LocalDateTime | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions._ | |
object WordWithDateTimeKafkaCouchbase { | |
def main(args: Array[String]) { | |
// create spark session with settings | |
val spark = SparkSession | |
.builder | |
.appName("Word Count Test") | |
.config("spark.couchbase.username", "[username goes here]") | |
.config("spark.couchbase.password", "[password goes here]") | |
.config("spark.couchbase.nodes", "localhost") | |
.config("spark.couchbase.bucket.words", "") | |
.getOrCreate() | |
import spark.implicits._ | |
// create kafka streaming data frame | |
val kafkaDS = spark | |
.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "kafka:9092") | |
.option("subscribe", "words") | |
.option("failOnDataLoss", "false") | |
.load() | |
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") | |
.as[(String, String)] | |
val wordDF = kafkaDS.flatMap { | |
case (_, value) => value.split("""\s+""").map(word => (word, LocalDateTime.now().toString)) | |
} | |
.toDF("word", "date") | |
val query = wordDF.writeStream | |
.format("com.couchbase.spark.sql") | |
.outputMode("update") | |
.option("checkpointLocation", "/home/vagrant/checkpoint") | |
.option("idField", "word") | |
.start() | |
.awaitTermination() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment