import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import java.io.File import java.io.IOException import kotlin.coroutines.resumeWithException @ExperimentalCoroutinesApi suspend fun startProcess(cmd: String, workingDir: File): Process = suspendCancellableCoroutine { cont -> try { val process = ProcessBuilder(*cmd.split("\\s".toRegex()).toTypedArray()) .directory(workingDir) .start() cont.invokeOnCancellation { println("Destroying $process due to cancellation before started.") process.destroy() } cont.resume(process) { process.destroy() } } catch (e: Throwable) { cont.resumeWithException(e) } } @ExperimentalCoroutinesApi suspend fun Process.wait(): Unit = suspendCancellableCoroutine { cont -> cont.invokeOnCancellation { println("Destroying $this due to cancellation.") destroy() } try { when (val returnCode = waitFor()) { 0 -> { cont.resume(Unit) { destroy() } } else -> { cont.resumeWithException(IOException("Process $this exits with non zero code $returnCode.")) } } } catch (e: Throwable) { cont.resumeWithException(e) } } interface ProcessOutLine { val line: String } data class StdOutLine(override val line: String) : ProcessOutLine data class StdErrLine(override val line: String) : ProcessOutLine @ExperimentalCoroutinesApi @FlowPreview suspend fun runCommand(cmd: String, workingDir: File = File(".")): Flow<ProcessOutLine> = channelFlow { withContext(Dispatchers.IO) { val process = startProcess(cmd, workingDir) launch { for (line in process.inputStream.bufferedReader().lines()) { send(StdOutLine(line)) } } launch { for (line in process.errorStream.bufferedReader().lines()) { send(StdErrLine(line)) } } process.wait() } } @ExperimentalCoroutinesApi @FlowPreview suspend fun main(): Unit = coroutineScope { withTimeout(500) { runCommand("java -version").collect { println(it) //delay(1000) } } }