Skip to content

Instantly share code, notes, and snippets.

@renaudcerrato
Created June 6, 2025 06:32
Show Gist options
  • Select an option

  • Save renaudcerrato/230d6c8ef4e405acba43a761b4e2f555 to your computer and use it in GitHub Desktop.

Select an option

Save renaudcerrato/230d6c8ef4e405acba43a761b4e2f555 to your computer and use it in GitHub Desktop.
Kotlin's Window flow operator
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.time.ComparableTimeMark
import kotlin.time.Duration
import kotlin.time.TimeMark
import kotlin.time.TimeSource
/**
* Returns a [Flow] that emits lists of items collected from the original [Flow] within a moving time window.
*
* Each emitted list contains items that arrived within the specified [duration].
* The window slides with each new item, and items are removed from the window once their [duration] has passed.
*
* Example:
* ```
* flowOf(1, 2, 3, 4, 5)
* .onEach { delay(100.milliseconds) } // Simulate items arriving over time
* .window(300.milliseconds)
* .collect { println(it) }
* ```
* Possible output (timing dependent):
* ```
* [1]
* [1, 2]
* [1, 2, 3]
* [2, 3, 4] // Item 1 expired
* [3, 4, 5] // Item 2 expired
* [4, 5] // Item 3 expired
* [5] // Item 4 expired
* [] // Item 5 expired (if flush happens after last item's window)
* ```
*
* @param duration The duration of the time window.
* @param clock The [TimeSource] to use for marking time. Defaults to [TimeSource.Monotonic].
* @return A [Flow] of lists, where each list represents the items within the current time window.
*/
fun <T> Flow<T>.window(
duration: Duration,
clock: TimeSource.WithComparableMarks = TimeSource.Monotonic,
flushContext: CoroutineContext = EmptyCoroutineContext,
): Flow<List<T>> =
map { TimeMarkedValue(it, clock.markNow()) }.window(duration, flushContext)
fun <T> Flow<TimeMarkedValue<T, ComparableTimeMark>>.window(
duration: Duration,
flushContext: CoroutineContext = EmptyCoroutineContext,
): Flow<List<T>> =
channelFlow {
val buffer = WindowBuffer<T>()
val mutex = Mutex()
val flushJob = launch(flushContext) {
var sleep = duration
while (isActive) {
delay(sleep)
sleep = mutex.withLock {
if (buffer.flush())
send(buffer.snapshot())
buffer.nextEvictionFromNowOrNull()?.coerceAtLeast(Duration.ZERO) ?: duration
}
}
}
collect { item ->
mutex.withLock {
buffer.flush()
buffer.add(item.value, expiration = item.mark + duration)
send(buffer.snapshot())
}
}
flushJob.cancelAndJoin()
if (buffer.flush()) {
send(buffer.snapshot())
}
buffer.clear()
}
data class TimeMarkedValue<V, T : TimeMark>(val value: V, val mark: T)
internal class WindowBuffer<T>(
initialCapacity: Int = 0,
) {
private val buffer = ArrayDeque<TimeMarkedValue<T, ComparableTimeMark>>(initialCapacity)
fun add(item: T, expiration: ComparableTimeMark) {
val last = buffer.lastOrNull()?.mark
if (last != null) require(last <= expiration) { "expiration must be strictly monotonic (previous: $last, current: $expiration)" }
buffer.add(TimeMarkedValue(item, expiration))
}
fun flush(): Boolean {
val iterator = buffer.iterator()
var removed = false
while (iterator.hasNext()) {
val (_, expiration) = iterator.next()
if (expiration.hasPassedNow()) {
iterator.remove()
removed = true
} else
break // buffer is sorted by expiration mark
}
return removed
}
fun firstOrNull() = buffer.firstOrNull()
fun snapshot(): List<T> = buffer.map { it.value }
fun isEmpty() = buffer.isEmpty()
fun clear() = buffer.clear()
}
internal fun <T> WindowBuffer<T>.nextEvictionOrNull() = firstOrNull()?.mark
internal fun <T> WindowBuffer<T>.nextEvictionFromNowOrNull() = nextEvictionOrNull()?.let {
-it.elapsedNow()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment