Skip to content

Instantly share code, notes, and snippets.

@whysoserious
Last active August 29, 2015 14:09
Show Gist options
  • Save whysoserious/0866f25f414335c17c80 to your computer and use it in GitHub Desktop.
Save whysoserious/0866f25f414335c17c80 to your computer and use it in GitHub Desktop.
Chunked responses from spray.io server
import akka.actor._
import akka.util.ByteString
import spray.http.HttpEntity.Empty
import spray.http.MediaTypes._
import spray.http._
import spray.routing.{HttpService, RequestContext, SimpleRoutingApp}
object ChunkedSpray extends App with SimpleRoutingApp {
import StreamingActor._
implicit val actorSystem = ActorSystem()
lazy val stream: Stream[String] = "first" #:: "second" #:: "third" #:: Stream.empty
lazy val emptyStream: Stream[Array[Byte]] = Stream.empty
startServer(interface = "localhost", port = 8080) {
get {
path("stream") {
ctx =>
actorRefFactory.actorOf(fromString(stream, ctx))
} ~
path("empty-stream") {
ctx =>
actorRefFactory.actorOf(fromByteArray(emptyStream, ctx))
}
}
}
}
import akka.actor._
import akka.util.ByteString
import spray.http.HttpEntity.Empty
import spray.http.MediaTypes._
import spray.http._
import spray.routing.{HttpService, RequestContext, SimpleRoutingApp}
object StreamingActor {
// helper methods
def fromString(iterable: Iterable[String], ctx: RequestContext): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromStringAndCharset(iterable: Iterable[String], ctx: RequestContext, charset: HttpCharset): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromByteArray(iterable: Iterable[Array[Byte]], ctx: RequestContext): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromByteString(iterable: Iterable[ByteString], ctx: RequestContext): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromHttpData(iterable: Iterable[HttpData], ctx: RequestContext): Props = {
Props(new StreamingActor(iterable, ctx))
}
// initial message sent by StreamingActor to itself
private case object FirstChunk
// confirmation that given chunk was sent to client
private case object ChunkAck
}
class StreamingActor(chunks: Iterable[HttpData], ctx: RequestContext) extends Actor with HttpService with ActorLogging {
import StreamingActor._
def actorRefFactory = context
val chunkIterator: Iterator[HttpData] = chunks.iterator
self ! FirstChunk
def receive = {
// send first chunk to client
case FirstChunk if chunkIterator.hasNext =>
val responseStart = HttpResponse(entity = HttpEntity(`text/html`, chunkIterator.next()))
ctx.responder ! ChunkedResponseStart(responseStart).withAck(ChunkAck)
// data stream is empty. Respond with Content-Length: 0 and stop
case FirstChunk =>
ctx.responder ! HttpResponse(entity = Empty)
context.stop(self)
// send next chunk to client
case ChunkAck if chunkIterator.hasNext =>
val nextChunk = MessageChunk(chunkIterator.next())
ctx.responder ! nextChunk.withAck(ChunkAck)
// all chunks were sent. stop.
case ChunkAck =>
ctx.responder ! ChunkedMessageEnd
context.stop(self)
//
case x => unhandled(x)
}
}
$ telnet 127.0.0.1 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
GET /stream HTTP/1.1
Host: 127.0.0.1:8080
HTTP/1.1 200 OK
Server: spray-can/1.3.2
Date: Thu, 13 Nov 2014 19:45:53 GMT
Content-Type: text/html
Transfer-Encoding: chunked
5
first
6
second
5
third
0
^]
telnet> q
Connection closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment