class KafkaSinkStreamingSuite
...
test("single node streaming") {
val input = MemoryStream[String]
val topic = newTopic()
testUtils.createTopic(topic)
val writer = createKafkaWriter(
input.toDF(),
withTopic = None,
withOutputMode = Some(OutputMode.Append))(
withSelectExpr = s"'$topic' as topic", "value")
try {
for (i <- 0 until 10000) {
input.addData(i.toString)
}
writer.processAllAvailable()
} finally {
writer.stop()
}
}
...
Last active
October 14, 2019 15:01
-
-
Save gaborgsomogyi/a5ed370e009b65a716d85f642fad21d9 to your computer and use it in GitHub Desktop.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment