Skip to content

Instantly share code, notes, and snippets.

@WadeWaldron
Created February 21, 2017 15:22
Show Gist options
  • Save WadeWaldron/d64033e983249d222238aa0ffa000296 to your computer and use it in GitHub Desktop.
Save WadeWaldron/d64033e983249d222238aa0ffa000296 to your computer and use it in GitHub Desktop.
Chunked Responses
package chunkedresponses
import akka.actor.{Actor, ActorRef}
import spray.http.HttpHeaders.RawHeader
import spray.http._
object ChunkedResponder {
case class Chunk(data: HttpData)
case object Shutdown
case object Ack
}
class ChunkedResponder(contentType: ContentType, responder: ActorRef) extends Actor {
import ChunkedResponder._
def receive:Receive = {
case chunk: Chunk =>
responder.forward(ChunkedResponseStart(HttpResponse(entity = HttpEntity(contentType, chunk.data))).withAck(Ack))
context.become(chunking)
case Shutdown =>
responder.forward(HttpResponse(headers = List(RawHeader("Content-Type", contentType.value))).withAck(Ack))
context.stop(self)
}
def chunking:Receive = {
case chunk: Chunk =>
responder.forward(MessageChunk(chunk.data).withAck(Ack))
case Shutdown =>
responder.forward(ChunkedMessageEnd().withAck(Ack))
context.stop(self)
}
}
package chunkedresponses
import akka.actor.ActorRef
import akka.util.Timeout
import akka.pattern.ask
import play.api.libs.iteratee.{Done, Step, Input, Iteratee}
import spray.http.HttpData
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
class ChunkIteratee(chunkedResponder: ActorRef) extends Iteratee[HttpData, Unit] {
import ChunkedResponder._
private implicit val timeout = Timeout(30.seconds)
def fold[B](folder: (Step[HttpData, Unit]) => Future[B])(implicit ec: ExecutionContext): Future[B] = {
def waitForAck(future: Future[Any]):Iteratee[HttpData, Unit] = Iteratee.flatten(future.map(_ => this))
def step(input: Input[HttpData]):Iteratee[HttpData, Unit] = input match {
case Input.El(e) => waitForAck(chunkedResponder ? Chunk(e))
case Input.Empty => waitForAck(Future.successful(Unit))
case Input.EOF =>
chunkedResponder ! Shutdown
Done(Unit, Input.EOF)
}
folder(Step.Cont(step))
}
}
import akka.actor.{ActorContext, ActorRefFactory, Props}
import play.api.libs.iteratee.Enumerator
import spray.http.{HttpData, ContentType}
import spray.routing.RequestContext
import scala.concurrent.ExecutionContext
package object chunkedresponses {
implicit class ChunkedRequestContext(requestContext: RequestContext) {
def completeChunked(contentType: ContentType, enumerator: Enumerator[HttpData])
(implicit executionContext: ExecutionContext, actorRefFactory: ActorRefFactory) {
val chunkedResponder = actorRefFactory.actorOf(Props(new ChunkedResponder(contentType, requestContext.responder)))
val iteratee = new ChunkIteratee(chunkedResponder)
enumerator.run(iteratee)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment