Last active
July 2, 2016 22:29
-
-
Save soywiz/03bf0e11a591e9739ac73a4290d0c739 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Depends on code defined here (related to EventLoop and promise implementation): | |
// https://gist.github.com/soywiz/a2a26b9375f1f85048ab3ce7ffdb5501 | |
// More examples here: httpshttps://github.com/kotlin-es/coroutine-examples/blob/master/src/example-async-generator.kt | |
import java.util.* | |
fun main(args: Array<String>) = EventLoop.main { | |
// Asynchronous Producer | |
fun readUsers() = generateAsync<User> { | |
// This could read data results from disk or a socket | |
for (n in 0 until 4) { | |
waitAsync(0.3.seconds).await() | |
emit(User(name = "test$n", age = n * 5)) | |
} | |
} | |
async<Unit> { | |
// Consumer | |
readUsers().eachAsync { user -> | |
println(user) | |
}.await() | |
println("----") | |
// Consumer (eachAsync+await alias just inside async block) | |
readUsers().each { | |
println(it) | |
} | |
println("----") | |
val sumPromise = readUsers().mapAsync { it.age }.sumAsync() | |
val sumGreatOrEqualThan10Promise = readUsers().filterAsync { it.age >= 10 }.mapAsync { it.age }.sumAsync() | |
println("Parallelized:") | |
println("All ages summed: " + sumPromise.await()) | |
println("All ages (greater than 10) summed: " + sumGreatOrEqualThan10Promise.await()) | |
} | |
} | |
data class User(val name:String, val age:Int) | |
fun <T> generateAsync(coroutine routine: AsyncStreamController<T>.() -> Continuation<Unit>): AsyncStream<T> { | |
val controller = AsyncStreamController<T>() | |
val c = routine(controller) | |
c.resume(Unit) | |
return controller.stream | |
} | |
class AsyncStreamController<T> { | |
private val emitter = AsyncStream.Emitter<T>() | |
val stream = emitter.stream | |
fun emit(value: T) { | |
emitter.emit(value) | |
} | |
suspend fun <T> Promise<T>.await(c: Continuation<T>) { | |
this.then( | |
resolved = { | |
c.resume(it) | |
}, | |
rejected = { | |
c.resumeWithException(it) | |
} | |
) | |
} | |
suspend fun AsyncStream<T>.each(handler: (T) -> Unit, c: Continuation<Unit>) { | |
this.eachAsync(handler).await(c) | |
} | |
operator fun handleResult(v: Unit, c: Continuation<Nothing>) { | |
emitter.close() | |
} | |
operator fun handleException(t: Throwable, c: Continuation<Nothing>) { | |
emitter.deferred.reject(t) | |
} | |
} | |
class AsyncStream<T> { | |
typealias Handler = (T) -> Unit | |
private val deferred = Promise.Deferred<Unit>() | |
private val handlers = arrayListOf<Handler>() | |
class Emitter<T> { | |
val stream = AsyncStream<T>() | |
val deferred = stream.deferred | |
val buffer = LinkedList<T>() | |
fun emit(value: T) { | |
buffer += value | |
if (stream.handlers.isNotEmpty()) { | |
while (buffer.isNotEmpty()) { | |
val item = buffer.removeFirst() | |
for (handler in stream.handlers) handler(item) | |
} | |
} | |
} | |
fun close() { | |
deferred.resolve(Unit) | |
} | |
} | |
fun eachAsync(handler: Handler) = listenAsync(handler) | |
fun listenAsync(handler: Handler): Promise<Unit> { | |
handlers += handler | |
return deferred.promise | |
} | |
fun <T2> mapAsync(map: (T) -> T2): AsyncStream<T2> { | |
val emitter = AsyncStream.Emitter<T2>() | |
this.listenAsync { | |
emitter.emit(map(it)) | |
}.then { | |
emitter.close() | |
} | |
return emitter.stream | |
} | |
fun filterAsync(filter: (T) -> Boolean): AsyncStream<T> { | |
val emitter = AsyncStream.Emitter<T>() | |
this.listenAsync { | |
if (filter(it)) emitter.emit(it) | |
}.then { | |
emitter.close() | |
} | |
return emitter.stream | |
} | |
fun <R> foldAsync(initial: R, fold: (R, T) -> R): Promise<R> { | |
val out = Promise.Deferred<R>() | |
var result = initial | |
this.listenAsync { | |
result = fold(result, it) | |
}.then { | |
out.resolve(result) | |
} | |
return out.promise | |
} | |
} | |
//fun <T : Number> AsyncStream<T>.sumAsync(): Promise<T> { | |
fun AsyncStream<Int>.sumAsync(): Promise<Int> { | |
return foldAsync(0) { a, b -> a + b } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment