Last active
September 13, 2023 11:12
-
-
Save AWinterman/8516d4869f491176ebb270dafbb23199 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.coroutineScope | |
import kotlinx.coroutines.delay | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.channelFlow | |
import kotlinx.coroutines.flow.collect | |
import kotlinx.coroutines.launch | |
private suspend fun <T> getChunk(channel: Channel<T>, maxChunkSize: Int): List<T> { | |
// suspend until there's an element in the buffer | |
val received = channel.receive() | |
// start a chunk | |
val chunk = mutableListOf(received) | |
// no more than chunk size will be retrieved | |
while (chunk.size < maxChunkSize) { | |
val polled = channel.poll() | |
if (polled == null) { | |
// then we've reached the end of the elements currently buffered. | |
return chunk | |
} | |
chunk.add(polled) | |
} | |
return chunk | |
} | |
/** | |
* [chunked] buffers a maximum of [maxSize] elements, preferring to emit early rather than wait if less than | |
* [maxSize] | |
* | |
* If [checkIntervalMillis] is specified, chunkedNaturally suspends [checkIntervalMillis] to allow the buffer to fill. | |
*/ | |
fun <T> Flow<T>.chunked(maxSize: Int, checkIntervalMillis: Long = 0): Flow<List<T>> { | |
val buffer = Channel<T>(maxSize) | |
return channelFlow { | |
coroutineScope { | |
launch { | |
[email protected] { | |
// `send` will suspend if [maxSize] elements are currently in buffer | |
buffer.send(it) | |
} | |
buffer.close() | |
} | |
launch { | |
while (!buffer.isClosedForReceive) { | |
val chunk = getChunk(buffer, maxSize) | |
[email protected](chunk) | |
delay(checkIntervalMillis) | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment