Skip to content

Instantly share code, notes, and snippets.

@jasoet
Last active August 26, 2016 19:16
Show Gist options
  • Save jasoet/55ed27b2382a21cbcd341fa521e56222 to your computer and use it in GitHub Desktop.
Save jasoet/55ed27b2382a21cbcd341fa521e56222 to your computer and use it in GitHub Desktop.
Extension Function for InputStream that convert it to Observable<String>
fun InputStream.toObservable(): Observable<String> {
class IOThread(private val inputStream: InputStream,
private val subscriber: Subscriber<in String>) : Thread() {
override fun run(): Unit {
Scanner(InputStreamReader(inputStream)).use {
while (it.hasNextLine()) {
subscriber.onNext(it.nextLine())
}
subscriber.onCompleted()
}
}
}
return observable { subscriber ->
IOThread(this, subscriber).start()
}
}
fun main(args: Array<String>) {
val processBuilder = ProcessBuilder("ping", "-c", "10", "www.google.com").apply {
directory(File("/Users/jasoet"))
}
val process = processBuilder.start()
val processInputStream = process.inputStream
val observableEvent = processInputStream.toObservable()
observableEvent
.doOnCompleted {
println("Finally Completed ${processInputStream.available()}")
}
.forEach { println("Observe => $it") }
process.waitFor()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment