Created
February 21, 2017 15:22
-
-
Save WadeWaldron/d64033e983249d222238aa0ffa000296 to your computer and use it in GitHub Desktop.
Chunked Responses
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package chunkedresponses | |
import akka.actor.{Actor, ActorRef} | |
import spray.http.HttpHeaders.RawHeader | |
import spray.http._ | |
object ChunkedResponder { | |
case class Chunk(data: HttpData) | |
case object Shutdown | |
case object Ack | |
} | |
class ChunkedResponder(contentType: ContentType, responder: ActorRef) extends Actor { | |
import ChunkedResponder._ | |
def receive:Receive = { | |
case chunk: Chunk => | |
responder.forward(ChunkedResponseStart(HttpResponse(entity = HttpEntity(contentType, chunk.data))).withAck(Ack)) | |
context.become(chunking) | |
case Shutdown => | |
responder.forward(HttpResponse(headers = List(RawHeader("Content-Type", contentType.value))).withAck(Ack)) | |
context.stop(self) | |
} | |
def chunking:Receive = { | |
case chunk: Chunk => | |
responder.forward(MessageChunk(chunk.data).withAck(Ack)) | |
case Shutdown => | |
responder.forward(ChunkedMessageEnd().withAck(Ack)) | |
context.stop(self) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package chunkedresponses | |
import akka.actor.ActorRef | |
import akka.util.Timeout | |
import akka.pattern.ask | |
import play.api.libs.iteratee.{Done, Step, Input, Iteratee} | |
import spray.http.HttpData | |
import scala.concurrent.duration._ | |
import scala.concurrent.{ExecutionContext, Future} | |
class ChunkIteratee(chunkedResponder: ActorRef) extends Iteratee[HttpData, Unit] { | |
import ChunkedResponder._ | |
private implicit val timeout = Timeout(30.seconds) | |
def fold[B](folder: (Step[HttpData, Unit]) => Future[B])(implicit ec: ExecutionContext): Future[B] = { | |
def waitForAck(future: Future[Any]):Iteratee[HttpData, Unit] = Iteratee.flatten(future.map(_ => this)) | |
def step(input: Input[HttpData]):Iteratee[HttpData, Unit] = input match { | |
case Input.El(e) => waitForAck(chunkedResponder ? Chunk(e)) | |
case Input.Empty => waitForAck(Future.successful(Unit)) | |
case Input.EOF => | |
chunkedResponder ! Shutdown | |
Done(Unit, Input.EOF) | |
} | |
folder(Step.Cont(step)) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{ActorContext, ActorRefFactory, Props} | |
import play.api.libs.iteratee.Enumerator | |
import spray.http.{HttpData, ContentType} | |
import spray.routing.RequestContext | |
import scala.concurrent.ExecutionContext | |
package object chunkedresponses { | |
implicit class ChunkedRequestContext(requestContext: RequestContext) { | |
def completeChunked(contentType: ContentType, enumerator: Enumerator[HttpData]) | |
(implicit executionContext: ExecutionContext, actorRefFactory: ActorRefFactory) { | |
val chunkedResponder = actorRefFactory.actorOf(Props(new ChunkedResponder(contentType, requestContext.responder))) | |
val iteratee = new ChunkIteratee(chunkedResponder) | |
enumerator.run(iteratee) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment