Created
February 16, 2015 17:09
-
-
Save pchlupacek/6123466ab7af685c0d3a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Transforms a process to inputStream. Note that `close` here will cause process to be killed in case | |
* it did not finish yet. Otherwise cleanups may be called before `close` is executed on InputStream. | |
* Process will be run after first `read` invocation on InputStream. | |
* The resulting input stream is NOT thread safe. | |
* @param source | |
* @return | |
*/ | |
def toInputStream(source:TSource[ByteVector])(implicit S:Strategy):InputStream = { | |
val killSignal = async.signalOf(false) | |
val task = killSignal.discrete.wye(source)(wye.interrupt).toTask | |
var current: ByteVector = ByteVector.empty | |
var finished : Boolean = false | |
// invoking this will read more from process | |
// if the process finished returns false, else returns true | |
def takeMore: Boolean = { | |
task.attemptRun match { | |
case -\/(Terminated(End | Kill)) ⇒ | |
finished = true | |
false | |
case -\/(rsn) ⇒ | |
finished = true | |
throw rsn | |
case \/-(bytes) ⇒ | |
current = current ++ bytes | |
true | |
} | |
} | |
new InputStream { | |
def read(): Int = { | |
def go : Int = { | |
if (current.nonEmpty) { | |
val b = current.head | |
current = current.tail | |
b.toInt | |
} else { | |
if (takeMore) go else -1 | |
} | |
} | |
if (!finished) go else -1 | |
} | |
override def read(b: Array[Byte], off: Int, len: Int): Int = { | |
@tailrec | |
def go : Int = { | |
if (current.size >= len) { | |
val (taken,rem) = current.splitAt(len) | |
taken.copyToArray(b,off) | |
current = rem | |
taken.size | |
} else { | |
if (takeMore) go | |
else { | |
//indicates what we have in buffer is all what we can have | |
val feed = current | |
feed.copyToArray(b,off) | |
current = ByteVector.empty | |
feed.size | |
} | |
} | |
} | |
if (!finished) go else -1 | |
} | |
override def close(): Unit = { | |
@tailrec | |
def go : Unit = if (takeMore) go else () | |
killSignal.set(true).run | |
if (! finished) go | |
} | |
override def skip(n: Long): Long = { | |
@tailrec | |
def go(dropped:Long) :Long = { | |
val rem = n - dropped | |
if (rem <= 0) dropped | |
else { | |
if (rem <= current.size) { | |
current = current.drop(rem.toInt) | |
n | |
} else { | |
val nd = current.size + dropped | |
current = ByteVector.empty | |
if (takeMore) go(nd) | |
else nd | |
} | |
} | |
} | |
if (!finished && n > 0) go(0) else 0 | |
} | |
override def available(): Int = if (!finished) current.size else 0 | |
override def mark(readlimit: Int): Unit = throw new UnsupportedOperationException("Marks are not supported on InputStream from Process") | |
override def markSupported(): Boolean = false | |
override def reset(): Unit = throw new UnsupportedOperationException("Marks are not supported on InputStream from Process") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment