Skip to content

Instantly share code, notes, and snippets.

@marcouberti
Created February 12, 2020 14:55
Show Gist options
  • Save marcouberti/053dda33068a101a02531ab8f8121520 to your computer and use it in GitHub Desktop.
Save marcouberti/053dda33068a101a02531ab8f8121520 to your computer and use it in GitHub Desktop.
Kotlin Channel with buffer and natural batching
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking<Unit> {
// create a Channel with a buffer of size 20
// this means that until 20 the send() are not suspending waiting for a receive()
val channel = Channel<String>(20)
// let's generate some events from different coroutines
launch {
for(i in 1..20) {
channel.send("event A $i")
}
delay(200)
for(i in 21..40) {
channel.send("event A $i")
}
log("A done")
}
launch {
for(i in 1..20) {
channel.send("event B $i")
}
delay(400)
for(i in 21..40) {
channel.send("event B $i")
}
log("B done")
}
// receive the events in batches of max 10 elements
launch {
while(true) {
delay(100)
val y = channel.receiveAvailable(10)
log(y)
}
}
delay(3000)
coroutineContext.cancelChildren() // cancel all children to let main finish
}
/**
* Receive all available elements up to [max]. Suspends for the first element if the channel is empty
*/
internal suspend fun <E> ReceiveChannel<E>.receiveAvailable(max: Int): List<E> {
if (max <= 0) {
return emptyList()
}
val batch = mutableListOf<E>()
if (this.isEmpty) {
// suspend until the next message is ready
batch.add(receive())
}
fun pollUntilMax() = if (batch.size >= max) null else poll()
// consume all other messages that are ready
var next = pollUntilMax()
while (next != null) {
batch.add(next)
next = pollUntilMax()
}
return batch
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment