Created
February 12, 2020 14:55
-
-
Save marcouberti/053dda33068a101a02531ab8f8121520 to your computer and use it in GitHub Desktop.
Kotlin Channel with buffer and natural batching
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.* | |
import kotlinx.coroutines.flow.* | |
import kotlinx.coroutines.channels.* | |
fun main() = runBlocking<Unit> { | |
// create a Channel with a buffer of size 20 | |
// this means that until 20 the send() are not suspending waiting for a receive() | |
val channel = Channel<String>(20) | |
// let's generate some events from different coroutines | |
launch { | |
for(i in 1..20) { | |
channel.send("event A $i") | |
} | |
delay(200) | |
for(i in 21..40) { | |
channel.send("event A $i") | |
} | |
log("A done") | |
} | |
launch { | |
for(i in 1..20) { | |
channel.send("event B $i") | |
} | |
delay(400) | |
for(i in 21..40) { | |
channel.send("event B $i") | |
} | |
log("B done") | |
} | |
// receive the events in batches of max 10 elements | |
launch { | |
while(true) { | |
delay(100) | |
val y = channel.receiveAvailable(10) | |
log(y) | |
} | |
} | |
delay(3000) | |
coroutineContext.cancelChildren() // cancel all children to let main finish | |
} | |
/** | |
* Receive all available elements up to [max]. Suspends for the first element if the channel is empty | |
*/ | |
internal suspend fun <E> ReceiveChannel<E>.receiveAvailable(max: Int): List<E> { | |
if (max <= 0) { | |
return emptyList() | |
} | |
val batch = mutableListOf<E>() | |
if (this.isEmpty) { | |
// suspend until the next message is ready | |
batch.add(receive()) | |
} | |
fun pollUntilMax() = if (batch.size >= max) null else poll() | |
// consume all other messages that are ready | |
var next = pollUntilMax() | |
while (next != null) { | |
batch.add(next) | |
next = pollUntilMax() | |
} | |
return batch | |
} | |
fun log(message: Any?) { | |
println("[${Thread.currentThread().name}] $message") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment