Skip to content

Instantly share code, notes, and snippets.

@ncomet
Last active December 18, 2023 09:32
Show Gist options
  • Save ncomet/4c2f000af7dfabcd31411704015783dc to your computer and use it in GitHub Desktop.
Save ncomet/4c2f000af7dfabcd31411704015783dc to your computer and use it in GitHub Desktop.
2 flows sync on channel
package org.example
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select
suspend fun main() = coroutineScope {
val events = MutableSharedFlow<String>()
val commands = MutableSharedFlow<String>()
val commandsChannel = Channel<String>()
launch { convertCommands(commands, commandsChannel) }
launch { listenToEvents(events, commandsChannel) }
launch { events.emit("1") }
delay(1000)
launch { commands.emit("1") }
delay(1000)
launch { events.emit("2") }
delay(1000)
launch { commands.emit("99") }
launch { commands.emit("2") }
delay(1000)
launch { events.emit("3") }
delay(1000)
launch { events.emit("4") }
launch { events.emit("5") }
delay(1000)
launch { commands.emit("5") }
delay(1000)
launch { events.emit("6") }
launch { events.emit("7") }
delay(1000)
}
suspend fun convertCommands(commands: MutableSharedFlow<String>, commandsChannel: Channel<String>) {
commands.collect {
commandsChannel.send(it)
}
}
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun listenToEvents(events: MutableSharedFlow<String>, commandsChannel: ReceiveChannel<String>) {
events.collect {
println("Received Event: $it")
coroutineScope {
val waitJob = launch { waitForCommand(commandsChannel, it) }
launch {
select {
waitJob.onJoin {
println("Received Command: $it")
}
onTimeout(5000) {
println("Timeout, did not receive Command for Event: $it")
waitJob.cancel()
}
}
}
}
}
}
suspend fun waitForCommand(commandsChannel: ReceiveChannel<String>, expectedCommand: String) {
for (command in commandsChannel) {
if (command == expectedCommand) {
break
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment