Skip to content

Instantly share code, notes, and snippets.

@erikhuizinga
Created September 29, 2019 19:23
Show Gist options
  • Save erikhuizinga/6d3f555200efdad38a04064580938b71 to your computer and use it in GitHub Desktop.
Save erikhuizinga/6d3f555200efdad38a04064580938b71 to your computer and use it in GitHub Desktop.
package com.example
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.atomic.AtomicInteger
import kotlin.random.Random
fun main() = runBlocking {
createAsyncObservers()
fetchAsyncData()
asyncCloseBroadcastChannelAndCancelJobs()
println("Joining all jobs") // Note that this prints first; everything before happens async
jobs.joinAll() // Joining is optional, but it ensures the main function doesn't end prematurely
println("Done")
}
private const val DELAY_TIME_MILLIS = 50L // Delay to simulate asynchronous operations, e.g. I/O
private const val MAX_NUM_OBSERVERS = 5 // Up to this number of coroutines will async observe data
private const val MAX_NUM_DATA_ITEMS = 100 // Up to this amount of data items will be fetched
private val jobs = mutableSetOf<Job>()
// Conflated: keep only last data item, so new observers collect only the latest item
private val broadcastChannel = BroadcastChannel<Int>(CONFLATED)
private val dataRepository = DataRepository(broadcastChannel)
private fun CoroutineScope.createAsyncObservers() {
repeat(MAX_NUM_OBSERVERS) {
jobs += launch {
delay(Random.nextLong(MAX_NUM_DATA_ITEMS * DELAY_TIME_MILLIS))
println("Observing dataRepository")
dataRepository.observe()
}
}
}
private fun CoroutineScope.fetchAsyncData() {
jobs += launch {
repeat(MAX_NUM_DATA_ITEMS) {
delay(Random.nextLong(DELAY_TIME_MILLIS))
dataRepository.fetchData()
}
}
}
/** Launch a coroutine that will eventually close the channel and cancel any pending observers */
private fun CoroutineScope.asyncCloseBroadcastChannelAndCancelJobs() {
launch {
delay(Random.nextLong(DELAY_TIME_MILLIS, MAX_NUM_DATA_ITEMS * DELAY_TIME_MILLIS))
println("Closing broadcastChannel")
broadcastChannel.close()
jobs.forEach { it.cancel() }
}
}
private class DataRepository(private val broadcastChannel: BroadcastChannel<Int>) {
private companion object {
// Atomic, because this can be written and read from various coroutines
private var data = AtomicInteger(0)
}
private val dataFlow: Flow<Int>
get() = broadcastChannel
.asFlow()
.onStart { println("Flow started") }
.onCompletion { println("Flow completed") }
suspend fun observe() {
println("Collecting dataFlow")
dataFlow.collect { println("Collected $it") }
}
suspend fun fetchData() {
delay(DELAY_TIME_MILLIS)
broadcastChannel
.takeUnless { it.isClosedForSend }
?.send(data.incrementAndGet().also { println("Sent $it") })
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment