Last active
December 17, 2018 22:02
-
-
Save kevinherron/5a895911a45eb5752ff247a205d618b7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.channels.* | |
import kotlinx.coroutines.delay | |
import java.time.Duration | |
import java.util.concurrent.TimeUnit | |
fun <T> ReceiveChannel<T>.debounce(timeout: Long, scope: CoroutineScope) = | |
this.debounce(Duration.ofMillis(timeout), scope) | |
fun <T> ReceiveChannel<T>.debounce( | |
timeout: Duration, | |
scope: CoroutineScope | |
): ReceiveChannel<T> { | |
return scope.produce(capacity = Channel.CONFLATED) { | |
val db = channel.debounce(timeout, scope) | |
[email protected] { db.send(it) } | |
} | |
} | |
fun <T> SendChannel<T>.debounce(timeout: Long, scope: CoroutineScope) = | |
this.debounce(Duration.ofMillis(timeout), scope) | |
fun <T> SendChannel<T>.debounce( | |
timeout: Duration, | |
scope: CoroutineScope | |
): SendChannel<T> { | |
return scope.actor(capacity = Channel.CONFLATED) { | |
val timeoutNanos = timeout.toNanos() | |
var lastSend = -1L | |
channel.consumeEach { | |
val elapsedNanos = System.nanoTime() - lastSend | |
if (elapsedNanos < timeoutNanos) { | |
val delayNanos = (timeoutNanos - elapsedNanos) | |
.coerceAtMost(timeoutNanos) | |
delay(TimeUnit.MILLISECONDS.convert(delayNanos, TimeUnit.NANOSECONDS)) | |
} | |
[email protected](it) | |
lastSend = System.nanoTime() | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DebounceTest { | |
@Test | |
fun testReceiveChannelDebounce() = runBlocking { | |
fun newProducer(delayMillis: Long) = GlobalScope.produce { | |
(0..9).forEach { | |
send(it) | |
delay(delayMillis) | |
} | |
} | |
consumeAndAssertCount(newProducer(0).debounce(100, GlobalScope), 1) | |
consumeAndAssertCount(newProducer(100).debounce(600, GlobalScope), 2) | |
consumeAndAssertCount(newProducer(100).debounce(150, GlobalScope), 7) | |
consumeAndAssertCount(newProducer(100).debounce(50, GlobalScope), 10) | |
} | |
@Test | |
fun testSendChannelDebounce() = runBlocking { | |
fun newProducer(delay: Long, timeout: Long) = GlobalScope.produce<Int> { | |
val db = this.channel.debounce(timeout, GlobalScope) | |
(0..9).forEach { | |
db.send(it) | |
delay(delay) | |
} | |
} | |
consumeAndAssertCount(newProducer(delay = 0, timeout = 100), 1) | |
consumeAndAssertCount(newProducer(delay = 100, timeout = 600), 2) | |
consumeAndAssertCount(newProducer(delay = 100, timeout = 150), 7) | |
consumeAndAssertCount(newProducer(delay = 100, timeout = 50), 10) | |
} | |
private fun consumeAndAssertCount(db: ReceiveChannel<Int>, count: Int) { | |
var consumed = 0 | |
var lastReceived = 0L | |
runBlocking { | |
db.consumeEach { | |
val now = System.currentTimeMillis() | |
if (lastReceived > 0) { | |
println("$it @ +${now - lastReceived}ms") | |
} else { | |
println("$it @ initial") | |
} | |
lastReceived = now | |
consumed++ | |
} | |
} | |
assertEquals(consumed, count) | |
} | |
} |
Thanks, updated.
Updated for coroutines 1.0
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You need to close the original channel when iteration on the initial one is finished. You can do so by replacing the for loop with consumeEach, or by putting the whole inside a
comsume
lambda.