Last active
August 9, 2021 12:40
-
-
Save wafer-li/94f2424e92b6b2ad705c663a34b99926 to your computer and use it in GitHub Desktop.
Kotlin Flow emit when the upstream emit the same value [windowSize] times
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.InternalCoroutinesApi | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.FlowCollector | |
import kotlinx.coroutines.flow.collect | |
import java.util.concurrent.atomic.AtomicInteger | |
private val defaultKeySelector: (Any?) -> Any? = { it } | |
private val defaultAreEquivalent: (Any?, Any?) -> Boolean = { old, new -> old == new } | |
fun <T> Flow<T>.equivalentUntilWindowed(windowSize: Int): Flow<T> { | |
return this.equivalentUntilWindowedBy( | |
windowSize = windowSize, | |
keySelector = defaultKeySelector, | |
areEquivalent = defaultAreEquivalent | |
) | |
} | |
@Suppress("UNCHECKED_CAST") | |
fun <T> Flow<T>.equivalentUntilWindowed( | |
windowSize: Int, | |
areEquivalent: (old: T, new: T) -> Boolean | |
): Flow<T> = equivalentUntilWindowedBy( | |
windowSize = windowSize, | |
keySelector = defaultKeySelector, | |
areEquivalent = areEquivalent as (Any?, Any?) -> Boolean, | |
) | |
fun <T, K> Flow<T>.equivalentUntilWindowedBy(windowSize: Int, keySelector: (T) -> K): Flow<T> = | |
equivalentUntilWindowedBy( | |
windowSize = windowSize, | |
keySelector = keySelector, | |
areEquivalent = defaultAreEquivalent | |
) | |
fun <T> Flow<T>.equivalentUntilWindowedBy( | |
windowSize: Int, | |
keySelector: (T) -> Any?, | |
areEquivalent: (old: Any?, new: Any?) -> Boolean | |
): Flow<T> = when { | |
this is EquivalentUntilWindowedFlowImpl<*> | |
&& this.keySelector == keySelector | |
&& this.areEquivalent == areEquivalent | |
&& this.size == windowSize -> this // same | |
else -> EquivalentUntilWindowedFlowImpl( | |
upstream = this, | |
size = windowSize, | |
keySelector = keySelector, | |
areEquivalent = areEquivalent | |
) | |
} | |
class EquivalentUntilWindowedFlowImpl<T>( | |
private val upstream: Flow<T>, | |
val size: Int, | |
@JvmField val keySelector: (T) -> Any?, | |
@JvmField val areEquivalent: (old: Any?, new: Any?) -> Boolean | |
) : Flow<T> { | |
private val count = AtomicInteger() | |
@InternalCoroutinesApi | |
override suspend fun collect(collector: FlowCollector<T>) { | |
var previousKey: Any? = null | |
upstream.collect upstreamCollect@{ value -> | |
val key = keySelector(value) | |
if (areEquivalent(previousKey, key)) { | |
val originCount = count.get() | |
if (originCount == size) { | |
return@upstreamCollect | |
} | |
val newCount = count.incrementAndGet() | |
if (checkCount(newCount)) { | |
collector.emit(value) | |
} | |
} else { | |
previousKey = key | |
count.set(0) | |
} | |
} | |
} | |
private fun checkCount(newCount: Int): Boolean { | |
return newCount == size | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment