Last active
June 21, 2021 15:32
-
-
Save frojasg/146acd867e687334d95c79e8a11f5ad9 to your computer and use it in GitHub Desktop.
playing with coroutines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package frojasg.marketprices | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.channels.ReceiveChannel | |
import kotlinx.coroutines.channels.ticker | |
import kotlinx.coroutines.flow.* | |
import java.time.Clock | |
import java.time.Instant | |
import java.lang.IllegalStateException | |
class PriceSyncer(val symbol: String, val clock: Clock) { | |
data class CurrentPrice(val priceCents: Long, val time: Instant) | |
var currentPrice: CurrentPrice = CurrentPrice(0, clock.instant()) | |
suspend fun fetchPrice(): CurrentPrice { | |
delay(200) // pretend we are doing something useful here | |
// Prices only goes up! :rocket: | |
currentPrice = currentPrice.copy(priceCents = currentPrice.priceCents + 1, time = clock.instant()) | |
println("${clock.instant()}: got price $symbol: $currentPrice") | |
check(currentPrice.priceCents < 10 || symbol != "TSLA") | |
return currentPrice | |
} | |
suspend fun sync() = coroutineScope { | |
val sharedFlow = flow { | |
val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0) | |
while (true) { | |
emit(fetchPrice()) | |
tickerChannel.receive() | |
} | |
} | |
// .catch { error -> println("can generate price $error") } | |
.conflate() | |
val flow = sharedFlow | |
.shareIn( | |
scope = this, | |
started = SharingStarted.Eagerly, | |
replay = 1, | |
) | |
launch { | |
saveValueProcess(flow) | |
} | |
launch { | |
publishMessageProcess(flow) | |
} | |
launch { | |
updateCacheProcess(flow) | |
} | |
} | |
private suspend fun saveValueProcess(priceFlow: SharedFlow<CurrentPrice>) { | |
priceFlow.collect { | |
delay(100) | |
println("${clock.instant()}: Updates database $symbol: $it") | |
} | |
} | |
private suspend fun updateCacheProcess(priceFlow: SharedFlow<CurrentPrice>) { | |
priceFlow | |
.onEach { | |
delay(50) | |
if (it.priceCents % 3L == 0L) { | |
throw IllegalStateException("lalalla") | |
} | |
println("${clock.instant()}: Updates Cache $symbol: $it") | |
} | |
.catch { cause -> println("${clock.instant()}: error updating cache: $cause ") } | |
.collect() | |
} | |
private suspend fun publishMessageProcess(flow: SharedFlow<CurrentPrice>) { | |
flow.collect { | |
delay(3000) | |
println("${clock.instant()}: publish message $symbol: $it") | |
} | |
} | |
} | |
data class PrinceSyncManager(val clock: Clock) { | |
suspend fun manage(symbolsChannel: ReceiveChannel<Set<String>>) = supervisorScope { | |
val state = mutableMapOf<String, Job>() | |
while (isActive) { | |
val symbols = symbolsChannel.receive() | |
symbols.filter { it !in state.keys } | |
.onEach { | |
println("launching syncer of $it") | |
state[it] = launch { | |
PriceSyncer(it, clock).sync() | |
} | |
} | |
state.keys | |
.filter { it !in symbols } | |
.onEach { | |
println("cancelling syncer for $it") | |
state[it]?.cancel() | |
state.remove(it) | |
} | |
} | |
} | |
} | |
fun main() = runBlocking<Unit> { | |
val clock = Clock.systemDefaultZone() | |
println("sdadas1") | |
val manager = PrinceSyncManager(clock) | |
val symbolsChannel = Channel<Set<String>>(1) | |
val managerJob = launch { | |
manager.manage(symbolsChannel) | |
} | |
println("${clock.instant()}: let's take a quick nap before start sycning") | |
delay(1000) | |
println("${clock.instant()}: ok let's sync APPL, SQ, and TSLA") | |
symbolsChannel.send(setOf("AAP", "SQ", "TSLA")) | |
println("${clock.instant()}: let's wait for 10 seconds and then change the list") | |
delay(10000) | |
println("${clock.instant()}: ok let's sync APPL, SQ, and T") | |
symbolsChannel.send(setOf("AAP", "SQ", "T")) | |
delay(10000) | |
managerJob.cancel() | |
println("sdadas") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment