Created
June 6, 2025 06:32
-
-
Save renaudcerrato/230d6c8ef4e405acba43a761b4e2f555 to your computer and use it in GitHub Desktop.
Kotlin's Window flow operator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import kotlinx.coroutines.cancelAndJoin | |
| import kotlinx.coroutines.delay | |
| import kotlinx.coroutines.flow.Flow | |
| import kotlinx.coroutines.flow.channelFlow | |
| import kotlinx.coroutines.flow.map | |
| import kotlinx.coroutines.isActive | |
| import kotlinx.coroutines.launch | |
| import kotlinx.coroutines.sync.Mutex | |
| import kotlinx.coroutines.sync.withLock | |
| import kotlin.coroutines.CoroutineContext | |
| import kotlin.coroutines.EmptyCoroutineContext | |
| import kotlin.time.ComparableTimeMark | |
| import kotlin.time.Duration | |
| import kotlin.time.TimeMark | |
| import kotlin.time.TimeSource | |
| /** | |
| * Returns a [Flow] that emits lists of items collected from the original [Flow] within a moving time window. | |
| * | |
| * Each emitted list contains items that arrived within the specified [duration]. | |
| * The window slides with each new item, and items are removed from the window once their [duration] has passed. | |
| * | |
| * Example: | |
| * ``` | |
| * flowOf(1, 2, 3, 4, 5) | |
| * .onEach { delay(100.milliseconds) } // Simulate items arriving over time | |
| * .window(300.milliseconds) | |
| * .collect { println(it) } | |
| * ``` | |
| * Possible output (timing dependent): | |
| * ``` | |
| * [1] | |
| * [1, 2] | |
| * [1, 2, 3] | |
| * [2, 3, 4] // Item 1 expired | |
| * [3, 4, 5] // Item 2 expired | |
| * [4, 5] // Item 3 expired | |
| * [5] // Item 4 expired | |
| * [] // Item 5 expired (if flush happens after last item's window) | |
| * ``` | |
| * | |
| * @param duration The duration of the time window. | |
| * @param clock The [TimeSource] to use for marking time. Defaults to [TimeSource.Monotonic]. | |
| * @return A [Flow] of lists, where each list represents the items within the current time window. | |
| */ | |
| fun <T> Flow<T>.window( | |
| duration: Duration, | |
| clock: TimeSource.WithComparableMarks = TimeSource.Monotonic, | |
| flushContext: CoroutineContext = EmptyCoroutineContext, | |
| ): Flow<List<T>> = | |
| map { TimeMarkedValue(it, clock.markNow()) }.window(duration, flushContext) | |
| fun <T> Flow<TimeMarkedValue<T, ComparableTimeMark>>.window( | |
| duration: Duration, | |
| flushContext: CoroutineContext = EmptyCoroutineContext, | |
| ): Flow<List<T>> = | |
| channelFlow { | |
| val buffer = WindowBuffer<T>() | |
| val mutex = Mutex() | |
| val flushJob = launch(flushContext) { | |
| var sleep = duration | |
| while (isActive) { | |
| delay(sleep) | |
| sleep = mutex.withLock { | |
| if (buffer.flush()) | |
| send(buffer.snapshot()) | |
| buffer.nextEvictionFromNowOrNull()?.coerceAtLeast(Duration.ZERO) ?: duration | |
| } | |
| } | |
| } | |
| collect { item -> | |
| mutex.withLock { | |
| buffer.flush() | |
| buffer.add(item.value, expiration = item.mark + duration) | |
| send(buffer.snapshot()) | |
| } | |
| } | |
| flushJob.cancelAndJoin() | |
| if (buffer.flush()) { | |
| send(buffer.snapshot()) | |
| } | |
| buffer.clear() | |
| } | |
| data class TimeMarkedValue<V, T : TimeMark>(val value: V, val mark: T) | |
| internal class WindowBuffer<T>( | |
| initialCapacity: Int = 0, | |
| ) { | |
| private val buffer = ArrayDeque<TimeMarkedValue<T, ComparableTimeMark>>(initialCapacity) | |
| fun add(item: T, expiration: ComparableTimeMark) { | |
| val last = buffer.lastOrNull()?.mark | |
| if (last != null) require(last <= expiration) { "expiration must be strictly monotonic (previous: $last, current: $expiration)" } | |
| buffer.add(TimeMarkedValue(item, expiration)) | |
| } | |
| fun flush(): Boolean { | |
| val iterator = buffer.iterator() | |
| var removed = false | |
| while (iterator.hasNext()) { | |
| val (_, expiration) = iterator.next() | |
| if (expiration.hasPassedNow()) { | |
| iterator.remove() | |
| removed = true | |
| } else | |
| break // buffer is sorted by expiration mark | |
| } | |
| return removed | |
| } | |
| fun firstOrNull() = buffer.firstOrNull() | |
| fun snapshot(): List<T> = buffer.map { it.value } | |
| fun isEmpty() = buffer.isEmpty() | |
| fun clear() = buffer.clear() | |
| } | |
| internal fun <T> WindowBuffer<T>.nextEvictionOrNull() = firstOrNull()?.mark | |
| internal fun <T> WindowBuffer<T>.nextEvictionFromNowOrNull() = nextEvictionOrNull()?.let { | |
| -it.elapsedNow() | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment