-
-
Save AitorATuin/89c3b522c17cccf39762 to your computer and use it in GitHub Desktop.
Chunk encoded response using Unfiltered, Netty and Scalaz-Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "netty-stream" | |
version := "0.1-SNAPSHOT" | |
scalaVersion := "2.10.2" | |
libraryDependencies += "net.databinder.dispatch" %% "dispatch-core" % "0.10.0" | |
libraryDependencies += "net.databinder" %% "unfiltered-netty-server" % "0.6.8" | |
libraryDependencies += "org.scalaz.stream" %% "scalaz-stream" % "0.1-SNAPSHOT" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package deiko | |
import java.nio.charset.Charset | |
import java.util.concurrent.CancellationException | |
import org.jboss.netty.buffer.ChannelBuffers | |
import org.jboss.netty.channel.{ Channel, ChannelFuture, ChannelFutureListener } | |
import org.jboss.netty.handler.codec.http.{ DefaultHttpChunk, HttpChunk } | |
import scalaz.stream.{ Process, process1, processes } | |
import scalaz.concurrent.Task | |
import Process.{ Process1, Sink } | |
import unfiltered.request.{ GET, Path, Seg } | |
import unfiltered.response.{ NotFound, TransferEncoding, PlainTextContent } | |
import unfiltered.netty.{ async, ReceivedMessage, ServerErrorResponse } | |
object Foo extends async.Plan with ServerErrorResponse { | |
def intent = { | |
case req @ GET(Path(Seg("bar" :: name :: Nil))) => stream(name, req.underlying) | |
case req => req.respond(NotFound) | |
} | |
def stream(filename: String, message: ReceivedMessage) { | |
val channel = message.context.getChannel | |
val end = Process.emit(HttpChunk.LAST_CHUNK) | |
val resp = message.defaultResponse(TransferEncoding("Chunked") ~> PlainTextContent) | |
val source = (processes.linesR(filename) |> toChunkProcess) ++ end | |
val action = for { | |
_ <- toTask(channel.write(resp)) | |
_ <- source.to(sink(channel)).run | |
} yield () | |
action.run | |
} | |
def toTask(future: ChannelFuture): Task[Unit] = | |
Task.async[Unit] { callback => | |
future.addListener(new ChannelFutureListener { | |
def operationComplete(res: ChannelFuture) { | |
if (res.isSuccess) callback(scalaz.\/-(())) | |
else if (res.isCancelled) callback(scalaz.-\/(new CancellationException)) | |
else callback(scalaz.-\/(res.getCause)) | |
} | |
}) | |
} | |
def sink(channel: Channel): Sink[Task, HttpChunk] = { | |
def go(input: HttpChunk) = | |
toTask(channel.write(input)) | |
Process.await(Task.delay[HttpChunk => Task[Unit]](go))(Process.emit).repeat | |
} | |
val toChunkProcess: Process1[String, HttpChunk] = | |
process1.lift(toChunk) | |
def toChunk(input: String) = | |
new DefaultHttpChunk(ChannelBuffers.copiedBuffer(input, Charset.forName("UTF-8"))) | |
} | |
object Server { | |
def main(args: Array[String]) { | |
unfiltered.netty.Http(9000).handler(Foo).run() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment