Last active
August 24, 2019 03:37
-
-
Save buildlackey/d7caed6bdf0a689c61e42ecc142b404b to your computer and use it in GitHub Desktop.
Streaming inner joins unit test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
it should "output the result as soon as it arrives without watermark" in { | |
val mainEventsStream = new MemoryStream[MainEvent](1, sparkSession.sqlContext) | |
val joinedEventsStream = new MemoryStream[JoinedEvent](2, sparkSession.sqlContext) | |
val stream = mainEventsStream.toDS().join(joinedEventsStream.toDS(), $"mainKey" === $"joinedKey") | |
val query = stream.writeStream.foreach(RowProcessor).start() | |
while (!query.isActive) {} | |
new Thread(new Runnable() { | |
override def run(): Unit = { | |
var key = 0 | |
while (true) { | |
// Here main event are always sent before the joined | |
// But we also send, an event for key - 10 in order to see if the main event is still kept in state store | |
joinedEventsStream.addData(Events.joined(s"key${key-10}")) | |
val mainEventTime = System.currentTimeMillis() | |
mainEventsStream.addData(MainEvent(s"key${key}", mainEventTime, new Timestamp(mainEventTime))) | |
Thread.sleep(1000L) | |
joinedEventsStream.addData(Events.joined(s"key${key}")) | |
key += 1 | |
} | |
} | |
}).start() | |
query.awaitTermination(60000) | |
// As you can see in this test, when neither watermark nor range condition is defined, the state isn't cleared | |
// It's why we can see data came 9/10 seconds after the first joined event of the same key | |
val groupedByKeys = TestedValuesContainer.values.groupBy(testedValues => testedValues.key) | |
val keysWith2Entries = groupedByKeys.filter(keyWithEntries => keyWithEntries._2.size == 2) | |
keysWith2Entries.foreach(keyWithEntries => { | |
val entries = keyWithEntries._2 | |
val metric1 = entries(0) | |
val metric2 = entries(1) // validate the two join results differ by between 9 and 10 seconds. | |
val diffBetweenEvents = metric2.joinedEventMillis - metric1.joinedEventMillis | |
val timeDiffSecs = diffBetweenEvents/1000 | |
(timeDiffSecs >= 9 && timeDiffSecs <= 10) shouldBe true | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment