Created
November 5, 2020 19:09
-
-
Save houssemzaier/315a93adbb9b997b51d874ef07a3cc2b to your computer and use it in GitHub Desktop.
Pushing data to one Flow from two twoCoroutines (Senders) and dispatching the received data to 2 receivers (collectors) that are collecting from one sharedFlow (dispatcher)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fr.francetv.francetvsport.arch.application | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.awaitClose | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.MutableStateFlow | |
import kotlinx.coroutines.flow.channelFlow | |
import kotlinx.coroutines.flow.collect | |
fun main() = runBlocking { | |
// val parentJob = coroutineContext[Job]!! don't kill the job comeing from runblocking this job refuse cancelation and throw exception | |
val parentJob = Job() | |
val parentExceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> | |
println("exception !! -> $throwable") | |
} | |
val parentContext = parentJob + parentExceptionHandler | |
launch(parentContext) { | |
val origin: Flow<Comparable<*>> = channelFlow { | |
launch { | |
for (i in 1..50) { | |
delay(500) | |
send(i) | |
println("sent the number : $i") | |
} | |
}.invokeOnCompletion { | |
println("****1st sender is finished") | |
} | |
launch { | |
for (c in 'a'..'z') { | |
delay(2000) | |
send(c) | |
println("sent the char : $c") | |
} | |
}.invokeOnCompletion { | |
println("****2nd sender is finished") | |
} | |
awaitClose { | |
println("****closing channelFlow") | |
} | |
} | |
val shared = MutableStateFlow<Any?>(null) | |
launch { | |
while (isActive) { | |
origin.collect { | |
println(" dispatcher consumed this: $it") | |
shared.value = it | |
println(" dispatcher send this: $it") | |
} | |
} | |
}.invokeOnCompletion { | |
println("****dispatcher is finished") | |
} | |
launch { | |
while (isActive) { | |
delay(1000) | |
val value = shared.value | |
println("1st consumer consumed this: $value") | |
} | |
}.invokeOnCompletion { | |
println("****1st consumer is finished") | |
} | |
launch { | |
while (isActive) { | |
delay(1000) | |
val value = shared.value | |
println("2st consumer consumed this: $value") | |
} | |
}.invokeOnCompletion { | |
println("****2st consumer is finished") | |
} | |
launch { | |
delay(10_000) | |
val jobParent = parentContext[Job] | |
println("closing my parent $jobParent") | |
parentJob.cancel() | |
}.invokeOnCompletion { | |
println("****closer is finished") | |
} | |
}.join() | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment