Last active
October 17, 2019 03:14
-
-
Save krishnabhargav/5c9d86d29aa583ff511eec94318806d2 to your computer and use it in GitHub Desktop.
Using Flow to simulate a pull sequence
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.FlowPreview | |
import kotlinx.coroutines.delay | |
import kotlinx.coroutines.flow.* | |
import kotlinx.coroutines.runBlocking | |
import kotlin.random.Random | |
@FlowPreview | |
fun main() = runBlocking { | |
val events = eventStoreProducer() | |
val messages = kafkaProducer() | |
val dynamic = unfoldAsync(0, ::unfoldExample) | |
//Note: need to study more on conflate seems like when its conflated the full sequence ends when the first sequence ends. | |
val merged = sequenceOf(events, messages, dynamic).asFlow().flattenMerge() | |
merged.collect { (m, commit) -> | |
println("Consumer: Received=$m") | |
commit() | |
//readLine() | |
} | |
println("Done") | |
} | |
private suspend fun unfoldExample(it: Int): Pair<Int, Pair<String, () -> Unit>>? { | |
return if(it < 5) { | |
delay(Random.nextLong(1000)) | |
val output = "Dynamic$it" to { println("Done!!!") } | |
(it + 1) to output | |
} | |
else | |
null | |
} | |
typealias CommitOffset = () -> Unit | |
fun eventStoreProducer(): Flow<Pair<String, CommitOffset>> = flow { | |
delay(Random.nextLong(1000)) | |
println("EventProducer: Fetching another event") | |
emit("Event 1" to { println("Committing event 1") }) | |
delay(Random.nextLong(1000)) | |
println("EventProducer: Fetching another event") | |
emit("Event 2" to { println("Committing event 2") }) | |
} | |
fun kafkaProducer(): Flow<Pair<String, CommitOffset>> = flow { | |
delay(Random.nextLong(1000)) | |
println("KafkaProducer: Fetching another message") | |
emit("Message 1" to { println("Committing offset 1") }) | |
delay(Random.nextLong(1000)) | |
println("KafkaProducer: Fetching another message") | |
emit("Message 2" to { println("Committing offset 2") }) | |
} | |
//Option type in Kotlin is a pain to implement so will just use nullable for this example. | |
//sealed class Option<T> { | |
// object None : Option<Nothing>() | |
// data class Some<T>(val a: T) : Option<T>() | |
//} | |
fun <T, S> unfoldAsync(initialState: S, unfolder: suspend (S) -> Pair<S, T>?): Flow<T> = flow { | |
var current = initialState | |
while (true) { | |
val result = unfolder(current) | |
if (result!=null) { | |
val (nextState, value) = result | |
emit(value) | |
current = nextState | |
} else return@flow | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment