Skip to content

Instantly share code, notes, and snippets.

@frojasg
Last active June 21, 2021 15:32
Show Gist options
  • Save frojasg/146acd867e687334d95c79e8a11f5ad9 to your computer and use it in GitHub Desktop.
Save frojasg/146acd867e687334d95c79e8a11f5ad9 to your computer and use it in GitHub Desktop.
playing with coroutines
package frojasg.marketprices
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.ticker
import kotlinx.coroutines.flow.*
import java.time.Clock
import java.time.Instant
import java.lang.IllegalStateException
class PriceSyncer(val symbol: String, val clock: Clock) {
data class CurrentPrice(val priceCents: Long, val time: Instant)
var currentPrice: CurrentPrice = CurrentPrice(0, clock.instant())
suspend fun fetchPrice(): CurrentPrice {
delay(200) // pretend we are doing something useful here
// Prices only goes up! :rocket:
currentPrice = currentPrice.copy(priceCents = currentPrice.priceCents + 1, time = clock.instant())
println("${clock.instant()}: got price $symbol: $currentPrice")
check(currentPrice.priceCents < 10 || symbol != "TSLA")
return currentPrice
}
suspend fun sync() = coroutineScope {
val sharedFlow = flow {
val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)
while (true) {
emit(fetchPrice())
tickerChannel.receive()
}
}
// .catch { error -> println("can generate price $error") }
.conflate()
val flow = sharedFlow
.shareIn(
scope = this,
started = SharingStarted.Eagerly,
replay = 1,
)
launch {
saveValueProcess(flow)
}
launch {
publishMessageProcess(flow)
}
launch {
updateCacheProcess(flow)
}
}
private suspend fun saveValueProcess(priceFlow: SharedFlow<CurrentPrice>) {
priceFlow.collect {
delay(100)
println("${clock.instant()}: Updates database $symbol: $it")
}
}
private suspend fun updateCacheProcess(priceFlow: SharedFlow<CurrentPrice>) {
priceFlow
.onEach {
delay(50)
if (it.priceCents % 3L == 0L) {
throw IllegalStateException("lalalla")
}
println("${clock.instant()}: Updates Cache $symbol: $it")
}
.catch { cause -> println("${clock.instant()}: error updating cache: $cause ") }
.collect()
}
private suspend fun publishMessageProcess(flow: SharedFlow<CurrentPrice>) {
flow.collect {
delay(3000)
println("${clock.instant()}: publish message $symbol: $it")
}
}
}
data class PrinceSyncManager(val clock: Clock) {
suspend fun manage(symbolsChannel: ReceiveChannel<Set<String>>) = supervisorScope {
val state = mutableMapOf<String, Job>()
while (isActive) {
val symbols = symbolsChannel.receive()
symbols.filter { it !in state.keys }
.onEach {
println("launching syncer of $it")
state[it] = launch {
PriceSyncer(it, clock).sync()
}
}
state.keys
.filter { it !in symbols }
.onEach {
println("cancelling syncer for $it")
state[it]?.cancel()
state.remove(it)
}
}
}
}
fun main() = runBlocking<Unit> {
val clock = Clock.systemDefaultZone()
println("sdadas1")
val manager = PrinceSyncManager(clock)
val symbolsChannel = Channel<Set<String>>(1)
val managerJob = launch {
manager.manage(symbolsChannel)
}
println("${clock.instant()}: let's take a quick nap before start sycning")
delay(1000)
println("${clock.instant()}: ok let's sync APPL, SQ, and TSLA")
symbolsChannel.send(setOf("AAP", "SQ", "TSLA"))
println("${clock.instant()}: let's wait for 10 seconds and then change the list")
delay(10000)
println("${clock.instant()}: ok let's sync APPL, SQ, and T")
symbolsChannel.send(setOf("AAP", "SQ", "T"))
delay(10000)
managerJob.cancel()
println("sdadas")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment