Created
September 29, 2019 19:23
-
-
Save erikhuizinga/6d3f555200efdad38a04064580938b71 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example | |
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.Job | |
import kotlinx.coroutines.channels.BroadcastChannel | |
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED | |
import kotlinx.coroutines.delay | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.asFlow | |
import kotlinx.coroutines.flow.collect | |
import kotlinx.coroutines.flow.onCompletion | |
import kotlinx.coroutines.flow.onStart | |
import kotlinx.coroutines.joinAll | |
import kotlinx.coroutines.launch | |
import kotlinx.coroutines.runBlocking | |
import java.util.concurrent.atomic.AtomicInteger | |
import kotlin.random.Random | |
fun main() = runBlocking { | |
createAsyncObservers() | |
fetchAsyncData() | |
asyncCloseBroadcastChannelAndCancelJobs() | |
println("Joining all jobs") // Note that this prints first; everything before happens async | |
jobs.joinAll() // Joining is optional, but it ensures the main function doesn't end prematurely | |
println("Done") | |
} | |
private const val DELAY_TIME_MILLIS = 50L // Delay to simulate asynchronous operations, e.g. I/O | |
private const val MAX_NUM_OBSERVERS = 5 // Up to this number of coroutines will async observe data | |
private const val MAX_NUM_DATA_ITEMS = 100 // Up to this amount of data items will be fetched | |
private val jobs = mutableSetOf<Job>() | |
// Conflated: keep only last data item, so new observers collect only the latest item | |
private val broadcastChannel = BroadcastChannel<Int>(CONFLATED) | |
private val dataRepository = DataRepository(broadcastChannel) | |
private fun CoroutineScope.createAsyncObservers() { | |
repeat(MAX_NUM_OBSERVERS) { | |
jobs += launch { | |
delay(Random.nextLong(MAX_NUM_DATA_ITEMS * DELAY_TIME_MILLIS)) | |
println("Observing dataRepository") | |
dataRepository.observe() | |
} | |
} | |
} | |
private fun CoroutineScope.fetchAsyncData() { | |
jobs += launch { | |
repeat(MAX_NUM_DATA_ITEMS) { | |
delay(Random.nextLong(DELAY_TIME_MILLIS)) | |
dataRepository.fetchData() | |
} | |
} | |
} | |
/** Launch a coroutine that will eventually close the channel and cancel any pending observers */ | |
private fun CoroutineScope.asyncCloseBroadcastChannelAndCancelJobs() { | |
launch { | |
delay(Random.nextLong(DELAY_TIME_MILLIS, MAX_NUM_DATA_ITEMS * DELAY_TIME_MILLIS)) | |
println("Closing broadcastChannel") | |
broadcastChannel.close() | |
jobs.forEach { it.cancel() } | |
} | |
} | |
private class DataRepository(private val broadcastChannel: BroadcastChannel<Int>) { | |
private companion object { | |
// Atomic, because this can be written and read from various coroutines | |
private var data = AtomicInteger(0) | |
} | |
private val dataFlow: Flow<Int> | |
get() = broadcastChannel | |
.asFlow() | |
.onStart { println("Flow started") } | |
.onCompletion { println("Flow completed") } | |
suspend fun observe() { | |
println("Collecting dataFlow") | |
dataFlow.collect { println("Collected $it") } | |
} | |
suspend fun fetchData() { | |
delay(DELAY_TIME_MILLIS) | |
broadcastChannel | |
.takeUnless { it.isClosedForSend } | |
?.send(data.incrementAndGet().also { println("Sent $it") }) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment