Last active
February 26, 2020 10:20
-
-
Save gaerfield/291d6a5e9441a90946fac909ff0357c1 to your computer and use it in GitHub Desktop.
Comparison of kotlins Flow vs. Channel-Approach
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package de.kramhal.coffeebutts | |
import de.kramhal.coffeebutts.Consumer.* | |
import de.kramhal.coffeebutts.FlowOperator.* | |
import de.kramhal.coffeebutts.Producer.* | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.channels.consumeEach | |
import kotlinx.coroutines.channels.produce | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.asFlow | |
import kotlinx.coroutines.flow.collect | |
import kotlinx.coroutines.flow.flow | |
import kotlin.system.measureTimeMillis | |
/** | |
* This example takes a look into two different approaches for typical problems with asynchronous proccessing of stream-data in kotlin: | |
* - asynchronous processing using channels | |
* - asynchronous processing using flows | |
* | |
* Problem #1 - Reading of data is slow on producer-side, because: | |
* a) either it is read from a single source (like HDD) | |
* b) or it is read from multiple slow sources (like different rest-endpoints) | |
* Problem #2 - given we have only one consumer available, consuming data is: | |
* a) either slow | |
* b) or fast | |
* | |
* Solving (1b) can be solved by using multiple coroutines, while with (1a) this approach is not possible. | |
* If only one slow consumer is available (2a), than the only option is parallelize reading and consuming, | |
* so they aren't blocking each other. | |
* | |
* The code is only meant to compare channels and flows when trying to solve these problems and should answer. | |
* How to using channels solving concurrent-producing? | |
* How to provide a flow-operator allowing to split reading and consuming concurrently? | |
* How could a flow be constructed for concurrent-producing? | |
* | |
* The printout is meant for validating the expectations. The demo produces every time the numbers from 1 to 10 | |
* with a delay of 100ms. The consumer just returns this number, in case of slow with a delay of 100ms. So the | |
* expectation would be: | |
* - sequential producing + fast consuming = about 1000ms | |
* - sequential producing + slow consuming = about 2000ms | |
* - sequential producing + parallel slow consuming = 1100ms | |
* - concurrently producing + slow consuming = 1100ms | |
* - concurrently producing + fast consuming = 100ms | |
*/ | |
/** only for simulating long execution time */ | |
suspend fun longRunningTask(i: Int): Int { delay(100); return i } | |
enum class Producer { | |
/** Data is emitted sequentially (like reading entries from an sql-table). */ | |
sequential, | |
/** Data is emitted concurrently (like different web-requests where we wait for an answer). */ | |
concurrent | |
} | |
enum class Consumer { | |
/** Processing Data without extra costs. */ | |
fast, | |
/** Processing Data involves expensive calculations. */ | |
slow | |
} | |
enum class FlowOperator { | |
/** data is processed sequentially by the flow */ | |
none, | |
/** producing and processing data is done concurrently by using two different flows */ | |
buffered | |
} | |
class Channels( | |
private val consumer: Consumer = slow | |
) { | |
suspend fun executionTime() = measureTimeMillis { | |
coroutineScope { | |
val channel = Channel<Int>() | |
val all = (1..10).map { async { channel.send(longRunningTask(it)) } } | |
launch { all.awaitAll(); channel.close() } | |
val flow = flow { | |
channel.consumeEach { | |
emit( if(consumer == fast) it else longRunningTask(it) ) | |
} | |
} | |
launch { flow.collect { println(it) } } | |
} | |
} | |
} | |
class Flows( | |
private val producer: Producer = sequential, | |
private val consumer: Consumer = slow, | |
private val flowOperator: FlowOperator = none | |
){ | |
private suspend fun producer() = when(producer) { | |
sequential -> flow { (1..10).forEach { emit(longRunningTask(it)) } } | |
concurrent -> coroutineScope { (1..10).map { async { longRunningTask(it) } }.awaitAll().asFlow() } | |
} | |
private suspend fun consumer(ints: Flow<Int>) = when(consumer) { | |
fast -> ints.collect { println(longRunningTask(it)) } | |
slow -> ints.collect { println(it) } | |
} | |
private fun <T> Flow<T>.buffer(size: Int = 0): Flow<T> = flow { | |
coroutineScope { | |
val channel = produce(capacity = size) { | |
collect { send(it) } | |
} | |
channel.consumeEach { emit(it) } | |
} | |
} | |
suspend fun executionTime() = measureTimeMillis { | |
when(flowOperator) { | |
none -> consumer(producer()) | |
buffered -> consumer(producer().buffer()) | |
} | |
} | |
} | |
fun main() { | |
println( | |
""" | |
Channels with concurrent slow producer: | |
- slow Consumer: ${runBlocking { Channels(consumer = slow).executionTime() }} | |
- fast Consumer: ${runBlocking { Channels(consumer = fast).executionTime() }} | |
Flows with sequential slow producer: | |
- fast Consumer: ${runBlocking { Flows(producer = sequential, consumer = fast, flowOperator = none).executionTime() }} | |
- slow Consumer: ${runBlocking { Flows(producer = sequential, consumer = slow, flowOperator = none).executionTime() }} | |
- operator and slow Consumer: ${runBlocking { Flows(producer = sequential, consumer = fast, flowOperator = buffered).executionTime() }} | |
- operator and fast Consumer: ${runBlocking { Flows(producer = sequential, consumer = slow, flowOperator = buffered).executionTime() }} | |
Flows with concurrent slow producer: | |
- slow Consumer: ${runBlocking { Flows(producer = concurrent, consumer = fast, flowOperator = none).executionTime() }} | |
- fast Consumer: ${runBlocking { Flows(producer = concurrent, consumer = slow, flowOperator = none).executionTime() }} | |
- operator and slow Consumer: ${runBlocking { Flows(producer = concurrent, consumer = fast, flowOperator = buffered).executionTime() }} | |
- operator and fast Consumer: ${runBlocking { Flows(producer = concurrent, consumer = slow, flowOperator = buffered).executionTime() }} | |
""".trimIndent() | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment