Skip to content

Instantly share code, notes, and snippets.

@Zimins
Last active June 19, 2021 07:55
Show Gist options
  • Select an option

  • Save Zimins/5d843771dfc2a05fb75e4aff82027c1e to your computer and use it in GitHub Desktop.

Select an option

Save Zimins/5d843771dfc2a05fb75e4aff82027c1e to your computer and use it in GitHub Desktop.
Part of StateFlow.kt(coroutines)
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
// The loop is arranged so that it starts delivering current value without waiting first
while (true) {
// Here the coroutine could have waited for a while to be dispatched,
// so we use the most recent state here to ensure the best possible conflation of stale values
val newState = _state.value
// always check for cancellation
collectorJob?.ensureActive()
// Conflate value emissions using equality
if (oldState == null || oldState != newState) {
collector.emit(NULL.unbox(newState))
oldState = newState
}
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
if (!slot.takePending()) { // try fast-path without suspending first
slot.awaitPending() // only suspend for new values when needed
}
}
} finally {
freeSlot(slot)
}
}
@Zimins
Copy link
Copy Markdown
Author

Zimins commented Jun 19, 2021

Difference between flow

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment