Skip to content

Instantly share code, notes, and snippets.

@buildlackey
Last active August 24, 2019 03:37
Show Gist options
  • Save buildlackey/d7caed6bdf0a689c61e42ecc142b404b to your computer and use it in GitHub Desktop.
Save buildlackey/d7caed6bdf0a689c61e42ecc142b404b to your computer and use it in GitHub Desktop.
Streaming inner joins unit test
it should "output the result as soon as it arrives without watermark" in {
  val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext)
  val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext)
 
  val stream = mainEventsStream.toDS().join(joinedEventsStream.toDS(), $"mainKey" === $"joinedKey")
 
  val query = stream.writeStream.foreach(RowProcessor).start()
 
  while (!query.isActive) {}
  new Thread(new Runnable() {
    override def run(): Unit = {
      var key = 0
      while (true) {
        // Here main event are always sent before the joined
        // But we also send, an event for key - 10 in order to see if the main event is still kept in state store
        joinedEventsStream.addData(Events.joined(s"key${key-10}"))
        val mainEventTime = System.currentTimeMillis()
        mainEventsStream.addData(MainEvent(s"key${key}", mainEventTime, new Timestamp(mainEventTime)))
        Thread.sleep(1000L)
        joinedEventsStream.addData(Events.joined(s"key${key}"))
        key += 1
      }
    }
  }).start()
  query.awaitTermination(60000)
 
  // As you can see in this test, when neither watermark nor range condition is defined, the state isn't cleared
  // It's why we can see data came 9/10 seconds after the first joined event of the same key
  val groupedByKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key)
  val keysWith2Entries = groupedByKeys.filter(keyWithEntries => keyWithEntries._2.size == 2)
  keysWith2Entries.foreach(keyWithEntries => {
    val entries = keyWithEntries._2
    val metric1 = entries(0)
    val metric2 = entries(1) // validate the two join results differ by between 9 and 10 seconds.
    val diffBetweenEvents = metric2.joinedEventMillis - metric1.joinedEventMillis
    val timeDiffSecs = diffBetweenEvents/1000
    (timeDiffSecs >= 9 && timeDiffSecs <= 10) shouldBe true
  })
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment