Skip to content

Instantly share code, notes, and snippets.

@danielesegato
Last active January 25, 2021 15:00
Show Gist options
  • Save danielesegato/3d33b5272b7ce787d0e3dde1d74acc77 to your computer and use it in GitHub Desktop.
Save danielesegato/3d33b5272b7ce787d0e3dde1d74acc77 to your computer and use it in GitHub Desktop.
TestFlowCollector to test kotlin coroutines Flow
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.Assert.*
import kotlin.coroutines.coroutineContext
import kotlin.reflect.KClass
class TestFlowCollector<T> {
private var job: Job? = null
private val _values = mutableListOf<T>()
var error: Throwable? = null
get() = synchronized(this) {
field
}
private set(value) {
synchronized(this) {
field = value
}
}
var completed: Boolean = false
get() = synchronized(this) { field }
private set(value) {
synchronized(this) {
field = value
}
}
val values: List<T>
get() = synchronized(this) {
_values.toList()
}
internal fun testOn(scope: CoroutineScope, flow: Flow<T>) {
synchronized(this) {
check(job == null) { "this TestFlowCollector testOn has already been used" }
job = scope.launch {
flow
.catch { e -> error = e }
.onCompletion { completed = true }
.collect { item ->
synchronized(this) {
_values.add(item)
}
}
}
}
}
fun cancel() {
val j = job
check(j != null) { "this TestFlowCollector testOn has never been used" }
j.cancel()
}
}
fun <T> Flow<T>.test(scope: CoroutineScope): TestFlowCollector<T> {
val testCollector = TestFlowCollector<T>()
testCollector.testOn(scope, this)
return testCollector
}
fun <T> TestFlowCollector<T>.lastValue(): T? = values.lastOrNull()
fun <T> TestFlowCollector<T>.assertEmittedCount(
count: Int,
message: String = "Emission count does not match expected"
) {
require(count >= 0) { "asserting emitted count less then zero makes no sense" }
assertEquals(message, count, values.size)
}
fun <T> TestFlowCollector<T>.assertNothingEmitted(message: String = "Expected nothing emitted") {
assertEmittedCount(0, message)
}
fun <T> TestFlowCollector<T>.assertEmittedValuesEquals(
values: List<T>,
message: String = "Emitted values expectation is not met"
) {
assertEquals(message, values, this.values)
}
fun <T> TestFlowCollector<T>.assertEmittedValuesEquals(
first: T,
vararg others: T,
message: String = "Emitted values expectation is not met"
) {
assertEquals(message, listOf(first, *others), values)
}
fun <T> TestFlowCollector<T>.assertEmittedValuesSame(
values: List<T>,
message: String = "Emitted values expectation is not met"
) {
values
.forEachIndexed { idx, item ->
assertSame("$message (on index $idx)", values[idx], item)
}
}
fun <T> TestFlowCollector<T>.assertEmittedValuesContains(
value: T,
message: String = "Value expected to be emitted was not"
) {
val found = values.firstOrNull { it == value }
assertEquals(message, value, found)
}
fun <T> TestFlowCollector<T>.assertCompleted(message: String = "Expected completed but was not") {
assertTrue(message, completed)
}
fun <T> TestFlowCollector<T>.assertNotCompleted(message: String = "Expected not completed but was") {
assertFalse(message, completed)
}
fun <T> TestFlowCollector<T>.assertNoError(message: String = "Expected no error but there was one") {
assertNull(message, error)
}
fun <T, E : Throwable> TestFlowCollector<T>.assertErrorThrown(
clazz: KClass<E>,
message: String = "Error expectation was not met"
): E? {
val e = error
assertNotNull(message, e)
if (e != null) {
assertEquals(message, clazz, e::class)
}
return e as? E
}
fun <T, E : Throwable> TestFlowCollector<T>.assertErrorThrown(
clazz: Class<E>,
message: String = "Error expectation was not met"
): E? {
val e = error
assertNotNull(message, e)
if (e != null) {
assertEquals(message, clazz, e::class.java)
}
return e as? E
}
fun <T> TestFlowCollector<T>.assertLastEmittedValueEquals(
v: T,
message: String = "Last Emitted value did not met expectations"
) {
assertEquals(message, v, lastValue())
}

Licence

Feel free to use this as you please, But please mention where it comes from :)

Author: Daniele Segato (https://github.com/danielesegato)

I'll add a proper licence in the future.

Disclaimer

It's not a library right now cause I just used it in a couple of project and I wanna see if there's more needed before I turn it into an actual library.

It's currently not well documented but it works.

I've written this fairly quickly with what I needed to go on and unit tests my classes with Flow. Can it be improved? Definitely. Will I improve it? Eventually. Is it tested? Not right now, no :) Will it be? When I release it as a library, provided someone else didn't make something better already, it will.

Usage

you can use test(scope) to create a TestFlowCollector that will start collecting from the flow and record everything that happens so that you can assert on it.

The TestFlowCollector will keep collecting until it completes or it is canceled.

@Test
fun test() = runBlockingTest {
  val myFlow = sut.myCodeThatGenerateAFlow()
  
  // build a TestFlowCollector
  val testCollector = myFlow.test(this) // this = the coroutine scope
  
  testCollector.assertEmittedCount(1)
  testCollector.assertNotCompleted()
  
  val last = testCollector.lastValue()
  // perform whatever assertion you want on the last value emitted (if any)
  
  // make your code produce another event
  sut.generateAnotherEvent()
  
  testCollector.assertEmittedCount(2)
  testCollector.assertLastEmittedValueEquals(whatever)

  testCollector.assertNoError()
  // dispose the flow (only needed if your flow never completes)
  testCollector.cancel()
  
}

each assertion is implemented as an Extension function so you can extend it by adding assertions easily yourself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment