Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Created February 16, 2015 17:09
Show Gist options
  • Save pchlupacek/6123466ab7af685c0d3a to your computer and use it in GitHub Desktop.
Save pchlupacek/6123466ab7af685c0d3a to your computer and use it in GitHub Desktop.
/**
* Transforms a process to inputStream. Note that `close` here will cause process to be killed in case
* it did not finish yet. Otherwise cleanups may be called before `close` is executed on InputStream.
* Process will be run after first `read` invocation on InputStream.
* The resulting input stream is NOT thread safe.
* @param source
* @return
*/
def toInputStream(source:TSource[ByteVector])(implicit S:Strategy):InputStream = {
val killSignal = async.signalOf(false)
val task = killSignal.discrete.wye(source)(wye.interrupt).toTask
var current: ByteVector = ByteVector.empty
var finished : Boolean = false
// invoking this will read more from process
// if the process finished returns false, else returns true
def takeMore: Boolean = {
task.attemptRun match {
case -\/(Terminated(End | Kill)) ⇒
finished = true
false
case -\/(rsn) ⇒
finished = true
throw rsn
case \/-(bytes) ⇒
current = current ++ bytes
true
}
}
new InputStream {
def read(): Int = {
def go : Int = {
if (current.nonEmpty) {
val b = current.head
current = current.tail
b.toInt
} else {
if (takeMore) go else -1
}
}
if (!finished) go else -1
}
override def read(b: Array[Byte], off: Int, len: Int): Int = {
@tailrec
def go : Int = {
if (current.size >= len) {
val (taken,rem) = current.splitAt(len)
taken.copyToArray(b,off)
current = rem
taken.size
} else {
if (takeMore) go
else {
//indicates what we have in buffer is all what we can have
val feed = current
feed.copyToArray(b,off)
current = ByteVector.empty
feed.size
}
}
}
if (!finished) go else -1
}
override def close(): Unit = {
@tailrec
def go : Unit = if (takeMore) go else ()
killSignal.set(true).run
if (! finished) go
}
override def skip(n: Long): Long = {
@tailrec
def go(dropped:Long) :Long = {
val rem = n - dropped
if (rem <= 0) dropped
else {
if (rem <= current.size) {
current = current.drop(rem.toInt)
n
} else {
val nd = current.size + dropped
current = ByteVector.empty
if (takeMore) go(nd)
else nd
}
}
}
if (!finished && n > 0) go(0) else 0
}
override def available(): Int = if (!finished) current.size else 0
override def mark(readlimit: Int): Unit = throw new UnsupportedOperationException("Marks are not supported on InputStream from Process")
override def markSupported(): Boolean = false
override def reset(): Unit = throw new UnsupportedOperationException("Marks are not supported on InputStream from Process")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment