class KafkaSinkBatchSuiteV2
...
test("single node batch") {
val topic = newTopic()
testUtils.createTopic(topic)
val rand = new Random()
val data = Seq.fill(100000)(Row(topic, rand.nextInt().toString))
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
StructType(Seq(StructField("topic", StringType), StructField("value", StringType)))
)
df.write
.mode(SaveMode.Append)
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", topic)
.save()
}
...
Last active
October 14, 2019 13:30
-
-
Save gaborgsomogyi/4634da1b6f1689ce32b2efcfcbf482d7 to your computer and use it in GitHub Desktop.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment