Skip to content

Instantly share code, notes, and snippets.

@jcaraballo
Last active July 1, 2025 08:00
Show Gist options
  • Save jcaraballo/573c0034d080efb3bfc54d6470e9c75b to your computer and use it in GitHub Desktop.
Save jcaraballo/573c0034d080efb3bfc54d6470e9c75b to your computer and use it in GitHub Desktop.
InputStream operations
import org.apache.commons.io.input.TeeInputStream
import java.io.InputStream
import java.io.PipedInputStream
import java.io.PipedOutputStream
import java.util.concurrent.Callable
import java.util.concurrent.Executors
import java.util.concurrent.Future
import kotlin.use
/**
* Returns a pair of InputStream that give access to the same bytes as `this`. They need to be consumed from different
* threads. Both need to be consumed at the same time, i.e. if one stops consuming, the other one will hang.
*/
fun InputStream.duplicate(): Pair<InputStream, InputStream> {
val pipedOutputStream = PipedOutputStream()
val pipedInputStream = PipedInputStream(pipedOutputStream)
val mainInputStream = TeeInputStream(this, pipedOutput, true)
return Pair(mainInputStream, pipedInputStream)
}
fun <T, U> InputStream.fork(forInputStream1: (InputStream) -> T, forInputStream2: (InputStream) -> U): Pair<T, U> {
val (main, dependent) = this.duplicate()
return Executors.newSingleThreadExecutor().use { executor ->
val future: Future<U> = executor.submit(Callable { forInputStream2(dependent) })
val t = main.use(forInputStream1)
val u = future.get()
t to u
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment