Last active
July 1, 2025 08:00
-
-
Save jcaraballo/573c0034d080efb3bfc54d6470e9c75b to your computer and use it in GitHub Desktop.
InputStream operations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.commons.io.input.TeeInputStream | |
import java.io.InputStream | |
import java.io.PipedInputStream | |
import java.io.PipedOutputStream | |
import java.util.concurrent.Callable | |
import java.util.concurrent.Executors | |
import java.util.concurrent.Future | |
import kotlin.use | |
/** | |
* Returns a pair of InputStream that give access to the same bytes as `this`. They need to be consumed from different | |
* threads. Both need to be consumed at the same time, i.e. if one stops consuming, the other one will hang. | |
*/ | |
fun InputStream.duplicate(): Pair<InputStream, InputStream> { | |
val pipedOutputStream = PipedOutputStream() | |
val pipedInputStream = PipedInputStream(pipedOutputStream) | |
val mainInputStream = TeeInputStream(this, pipedOutput, true) | |
return Pair(mainInputStream, pipedInputStream) | |
} | |
fun <T, U> InputStream.fork(forInputStream1: (InputStream) -> T, forInputStream2: (InputStream) -> U): Pair<T, U> { | |
val (main, dependent) = this.duplicate() | |
return Executors.newSingleThreadExecutor().use { executor -> | |
val future: Future<U> = executor.submit(Callable { forInputStream2(dependent) }) | |
val t = main.use(forInputStream1) | |
val u = future.get() | |
t to u | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment