Last active
August 29, 2015 13:56
-
-
Save sadache/8804038 to your computer and use it in GitHub Desktop.
A naive, potentially buggy and largely improvable implementation of Enumerator[Array[Byte]] to InputStream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.InputStream | |
import play.api.libs.iteratee.{ Enumerator, Iteratee } | |
import scala.concurrent.Promise | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.util.{ Success, Failure } | |
import java.util.concurrent.ArrayBlockingQueue | |
import java.util.concurrent.atomic.AtomicInteger | |
def toInputStream(chunks: Enumerator[Array[Byte]], bufferSize: Int): InputStream = new InputStream { | |
@volatile var waiting: Option[Promise[(Unit, Boolean)]] = None | |
// Improvement: replace BlockingQueue with A ring array buffer and block read manually | |
private val queue = new ArrayBlockingQueue[Option[Array[Byte]]](bufferSize) | |
var closed = false | |
var remaining: Option[Array[Byte]] = None | |
val availableBytes: AtomicInteger = new AtomicInteger(0) | |
chunks |>>> Iteratee.fold2[Array[Byte], Unit](()) { case (_, c) => | |
if(queue.size + 1 < bufferSize){ | |
queue.add(Some(c)) | |
availableBytes.addAndGet(c.size) | |
Future.successful(((), closed)) | |
} else { | |
val w = Promise[(Unit, Boolean)]() | |
waiting = Some(w) | |
if(queue.peek() == null || closed) w.tryComplete(Success(((), closed))) | |
w.future | |
} | |
} onComplete { | |
case Success(()) => queue.add(None) | |
case Failure(e) => println("Exception while feeding Enumerator to inputstream " + e) | |
} | |
private def tryFreeEnumerator(): Unit = { | |
waiting.foreach{ p => | |
waiting = None | |
p.tryComplete(Success(((), closed))) | |
} | |
} | |
override def close(): Unit = { | |
closed = true | |
tryFreeEnumerator() | |
} | |
override def read(b: Array[Byte]): Int = read(b, 0, b.length) | |
override def available() = { | |
availableBytes.get() | |
} | |
def read():Int = { | |
val a = new Array[Byte](1) | |
read(a,0,1) | |
a(0) | |
} | |
var done = false | |
override def read(b: Array[Byte], off: Int, len: Int): Int = { | |
if(done && queue.size <= 0 && !remaining.isDefined){ | |
println("done") | |
-1 | |
} | |
else{ | |
var left = Math.max(len - remaining.map(_.size).filter(_ > 0).getOrElse(0), 0) | |
remaining.foreach { arr => | |
arr.copyToArray(b, off, len) | |
remaining = Option(arr.slice(len, arr.size)).filter(_.size > 0) | |
} | |
tryFreeEnumerator() | |
while((left > 0 && (!done)) && (! (queue.peek() == null && left < len))) { | |
val chunk1 = queue.take() | |
chunk1 match { | |
case Some(c) => | |
c.copyToArray(b, off + (len-left), len) | |
left -= c.size | |
if(left < 0) remaining = Some(c.slice(len, c.size)) | |
case None => done = true | |
} | |
} | |
val consumed = Math.min(len - left, len) | |
availableBytes.addAndGet(- consumed) | |
consumed | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment