Skip to content

Instantly share code, notes, and snippets.

@AitorATuin
Forked from YoEight/Server.scala
Last active January 22, 2016 19:54
Show Gist options
  • Save AitorATuin/89c3b522c17cccf39762 to your computer and use it in GitHub Desktop.
Save AitorATuin/89c3b522c17cccf39762 to your computer and use it in GitHub Desktop.
Chunk encoded response using Unfiltered, Netty and Scalaz-Stream
name := "netty-stream"
version := "0.1-SNAPSHOT"
scalaVersion := "2.10.2"
libraryDependencies += "net.databinder.dispatch" %% "dispatch-core" % "0.10.0"
libraryDependencies += "net.databinder" %% "unfiltered-netty-server" % "0.6.8"
libraryDependencies += "org.scalaz.stream" %% "scalaz-stream" % "0.1-SNAPSHOT"
package deiko
import java.nio.charset.Charset
import java.util.concurrent.CancellationException
import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.channel.{ Channel, ChannelFuture, ChannelFutureListener }
import org.jboss.netty.handler.codec.http.{ DefaultHttpChunk, HttpChunk }
import scalaz.stream.{ Process, process1, processes }
import scalaz.concurrent.Task
import Process.{ Process1, Sink }
import unfiltered.request.{ GET, Path, Seg }
import unfiltered.response.{ NotFound, TransferEncoding, PlainTextContent }
import unfiltered.netty.{ async, ReceivedMessage, ServerErrorResponse }
object Foo extends async.Plan with ServerErrorResponse {
def intent = {
case req @ GET(Path(Seg("bar" :: name :: Nil))) => stream(name, req.underlying)
case req => req.respond(NotFound)
}
def stream(filename: String, message: ReceivedMessage) {
val channel = message.context.getChannel
val end = Process.emit(HttpChunk.LAST_CHUNK)
val resp = message.defaultResponse(TransferEncoding("Chunked") ~> PlainTextContent)
val source = (processes.linesR(filename) |> toChunkProcess) ++ end
val action = for {
_ <- toTask(channel.write(resp))
_ <- source.to(sink(channel)).run
} yield ()
action.run
}
def toTask(future: ChannelFuture): Task[Unit] =
Task.async[Unit] { callback =>
future.addListener(new ChannelFutureListener {
def operationComplete(res: ChannelFuture) {
if (res.isSuccess) callback(scalaz.\/-(()))
else if (res.isCancelled) callback(scalaz.-\/(new CancellationException))
else callback(scalaz.-\/(res.getCause))
}
})
}
def sink(channel: Channel): Sink[Task, HttpChunk] = {
def go(input: HttpChunk) =
toTask(channel.write(input))
Process.await(Task.delay[HttpChunk => Task[Unit]](go))(Process.emit).repeat
}
val toChunkProcess: Process1[String, HttpChunk] =
process1.lift(toChunk)
def toChunk(input: String) =
new DefaultHttpChunk(ChannelBuffers.copiedBuffer(input, Charset.forName("UTF-8")))
}
object Server {
def main(args: Array[String]) {
unfiltered.netty.Http(9000).handler(Foo).run()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment