Skip to content

Instantly share code, notes, and snippets.

@Andrew0000
Created January 22, 2025 12:56
Show Gist options
  • Save Andrew0000/8599fa45439b5e9b59b5481b38ed75d3 to your computer and use it in GitHub Desktop.
Save Andrew0000/8599fa45439b5e9b59b5481b38ed75d3 to your computer and use it in GitHub Desktop.
Flow throttleFirst
fun <T> Flow<T>.throttleFirst(
delayMillis: Long,
dispatcher: CoroutineDispatcher = Dispatchers.IO, // Unbounded dispatcher by default
): Flow<T> {
val delayScope = CoroutineScope(SupervisorJob() + dispatcher)
var isOpen = true
return this
.transform {
if (isOpen) {
isOpen = false
delayScope.launch {
delay(delayMillis)
isOpen = true
}
emit(it)
}
}
.onCompletion { delayScope.cancel() }
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun test_throttleFirst() = runTest {
val dispatcher = UnconfinedTestDispatcher()
val flow = flow {
for (i in 0..32) {
emit(i)
delay(100)
dispatcher.scheduler.advanceTimeBy(101)
}
}
val result = flow.throttleFirst(1000, dispatcher = dispatcher).toList()
assertEquals(listOf(0, 10, 20, 30), result)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment