Created
January 12, 2020 20:22
-
-
Save juliuscanute/0a72a90da46469674d88f9ed77c862bf to your computer and use it in GitHub Desktop.
[FanOut Demultiplexer - Multiple Subscription] #fanout #demultiplexer #subscription #kotlin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This is a class which implements a demultiplexer. It send each items | |
* into the first channel with a predicate that evaluates true for it | |
*/ | |
class Demultiplexer<E>(vararg val rules: Rule<E>) { | |
suspend fun consume(receiveChannel: ReceiveChannel<E>) { | |
for (item in receiveChannel) { | |
// Receive the data from the channel | |
for (rule in rules) { | |
// Check every rule until you find a successful one | |
if (rule.second(item)) { | |
rule.first.send(item) | |
} | |
} | |
} | |
// When here the channel has been closed so you can close the | |
// demultiplexed channels | |
closeAll() | |
} | |
// Closes all the demultiplexed channels | |
private fun closeAll() { | |
rules.forEach { it.first.close() } | |
} | |
} | |
@ExperimentalCoroutinesApi | |
fun main() { | |
data class Fruit(override val name: String, override val color: String) : Item | |
data class Vegetable(override val name: String, override val color: String) : Item | |
// ------------ Helper Methods ------------ | |
fun isFruit(item: Item) = item is Fruit | |
fun isVegetable(item: Item) = item is Vegetable | |
// Produces a finite number of items | |
// which are either a fruit or vegetable | |
fun produceItems(): ArrayList<Item> { | |
val itemsArray = ArrayList<Item>() | |
itemsArray.add(Fruit("Apple", "Red")) | |
itemsArray.add(Vegetable("Zucchini", "Green")) | |
itemsArray.add(Fruit("Grapes", "Green")) | |
itemsArray.add(Vegetable("Radishes", "Red")) | |
itemsArray.add(Fruit("Banana", "Yellow")) | |
itemsArray.add(Fruit("Cherries", "Red")) | |
itemsArray.add(Vegetable("Broccoli", "Green")) | |
itemsArray.add(Fruit("Strawberry", "Red")) | |
itemsArray.add(Vegetable("Red bell pepper", "Red")) | |
return itemsArray | |
} | |
runBlocking { | |
// Initialize the Channels | |
val kotlinChannel = Channel<Item>() | |
val fruitsChannel = Channel<Item>() | |
val vegetablesChannel = Channel<Item>() | |
launch { | |
produceItems().forEach { | |
kotlinChannel.send(it) | |
} | |
kotlinChannel.close() | |
} | |
val typeDemultiplexer = Demultiplexer( | |
fruitsChannel to { item: Item -> isFruit(item) }, | |
vegetablesChannel to { item: Item -> isVegetable(item) } | |
) | |
launch { | |
typeDemultiplexer.consume(kotlinChannel) | |
} | |
launch { | |
for (item in fruitsChannel) { | |
// Consume fruitsChannel | |
println("${item.name} is a fruit") | |
} | |
} | |
launch { | |
for (item in vegetablesChannel) { | |
// Consume vegetablesChannel | |
println("${item.name} is a vegetable") | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment