Skip to content

Instantly share code, notes, and snippets.

@gaborgsomogyi
Last active October 14, 2019 13:30
Show Gist options
  • Save gaborgsomogyi/4634da1b6f1689ce32b2efcfcbf482d7 to your computer and use it in GitHub Desktop.
Save gaborgsomogyi/4634da1b6f1689ce32b2efcfcbf482d7 to your computer and use it in GitHub Desktop.
class KafkaSinkBatchSuiteV2
...
  test("single node batch") {
    val topic = newTopic()
    testUtils.createTopic(topic)
    val rand = new Random()
    val data = Seq.fill(100000)(Row(topic, rand.nextInt().toString))

    val df = spark.createDataFrame(
      spark.sparkContext.parallelize(data),
      StructType(Seq(StructField("topic", StringType), StructField("value", StringType)))
    )

    df.write
      .mode(SaveMode.Append)
      .format("kafka")
      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
      .option("topic", topic)
      .save()
  }
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment