package coroutines |
import kotlinx.coroutines.* |
import kotlinx.coroutines.channels.Channel |
// This is an example of a three-stage multithreaded processing pipeline in Kotlin, |
// with blocking operations occurring in all three stages of the pipeline, implemented |
// using Kotlin `Channel` and coroutine objects. |
// |
// source -> filter -> output |
// |
// Thread.sleep is used to simulate a blocking IO operation. |
suspend fun source(cout: Channel<Int>) { |
println("Source starting") |
for (i in 1..10) { |
val x = (0..255).random() |
cout.send(x) |
println("Source iteration $i sent $x") |
withContext(Dispatchers.IO) { |
val sleep: Long = (400..600).random().toLong() |
Thread.sleep(sleep) |
} |
} |
cout.close() |
println("Source exiting") |
} |
suspend fun filter(cin: Channel<Int>, cout: Channel<String>) { |
println(" Filter starting") |
for (x in cin) { |
println(" Filter received $x") |
withContext(Dispatchers.IO) { |
val sleep: Long = (400..600).random().toLong() |
Thread.sleep(sleep) |
} |
val y = "'$x'" |
cout.send(y) |
println(" Filter sent $y") |
} |
cout.close() |
println(" Filter exiting") |
} |
suspend fun output(cin: Channel<String>) { |
println(" Output starting") |
for (x in cin) { |
println(" Output received $x") |
withContext(Dispatchers.IO) { |
val sleep: Long = (400..600).random().toLong() |
Thread.sleep(sleep) |
} |
} |
println(" Output exiting") |
} |
const val queueSize = 2 |
fun runAll() { |
runBlocking { |
println("runAll starting") |
val pipe1 = Channel<Int>(queueSize) |
val pipe2 = Channel<String>(queueSize) |
GlobalScope.launch { |
launch { source(pipe1) } |
launch { filter(pipe1, pipe2) } |
launch { output(pipe2) } |
}.join() |
} |
println("runAll exiting") |
} |
fun main() { |
println("main starting") |
runAll() |
println("main exiting") |
} |