Skip to content

Instantly share code, notes, and snippets.

@vastdevblog
Created March 12, 2012 14:30
Show Gist options
  • Save vastdevblog/2022320 to your computer and use it in GitHub Desktop.
Save vastdevblog/2022320 to your computer and use it in GitHub Desktop.
Example of a Finagle server
package com.vast.example
import java.net.InetSocketAddress
import java.util.UUID
import java.util.concurrent.{Executors, TimeUnit}
import com.google.common.base.Splitter
import com.twitter.finagle.http.Http
import com.twitter.finagle.builder.{Server, ServerBuilder}
import com.twitter.finagle.service.TimeoutFilter
import com.twitter.finagle.{Service, SimpleFilter, GlobalRequestTimeoutException}
import com.twitter.util.{Future, FuturePool, FutureTransformer, Duration}
import org.codehaus.jackson.map.ObjectMapper
import org.jboss.netty.buffer.ChannelBuffers.copiedBuffer
import org.jboss.netty.handler.codec.http.{HttpResponseStatus, DefaultHttpResponse, HttpRequest, HttpResponse}
import org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1
import org.jboss.netty.handler.codec.http.HttpResponseStatus.{NOT_FOUND, INTERNAL_SERVER_ERROR}
import org.jboss.netty.util.CharsetUtil.UTF_8
import org.jboss.netty.buffer.ChannelBuffer
/**
* Server entry point.
*
* @author Alex Moffat ([email protected])
*/
object ExampleServer {
def main(args: Array[String]) {
// Requests pass down through the filters to the service. The
// response from the service is passed back up through the filters
// to be returned. The Filter class provides the "andThen" method
// for creating the filter chain.
val service =
new ExceptionFilter andThen
new HeaderFilter andThen
new HttpTimeoutFilter(Duration(1, TimeUnit.SECONDS)) andThen
new RestService()
// Create the server and start it. The ServerBuilder is an example
// of the Type-Safe Builder pattern in scala. Options are set on
// the builder. Build is called to construct and start the server.
val server: Server = ServerBuilder()
.codec(Http())
.bindTo(new InetSocketAddress(8080))
.name("httpserver")
.build(service)
}
}
// Handle any uncaught exceptions that occur in filters or services
// lower down the chain. We extend SimpleFilter because we're not
// going to change the type of the request or response.
class ExceptionFilter extends SimpleFilter[HttpRequest, HttpResponse] {
// Deal with successful and error responses.
val transformer =
new FutureTransformer[HttpResponse, HttpResponse] {
// A successful response is just passed through.
override def map(value: HttpResponse): HttpResponse =
value
// An error is converted into a 500 response. The
// Responses object is in this file.
override def handle(throwable: Throwable): HttpResponse =
Responses.InternalServerError(throwable.getMessage)
}
// Apply method is where the filtering takes place.
def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = {
// Use transformedBy method to deal with both the normal and error responses.
service(request).transformedBy(transformer)
}
}
// An trial filter that adds a header to the request and another
// to the response.
class HeaderFilter extends SimpleFilter[HttpRequest, HttpResponse] {
def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = {
// Look for a X-Request-ID header and add one if missing.
val id = request.getHeader("X-Request-ID")
if (id == null) {
request.setHeader("X-Request-ID",
UUID.randomUUID().toString)
}
// Add X-Processes to the response.
service(request).onSuccess(r => {
r.addHeader("X-Processed", "TRUE")
})
}
}
// Specialize the general TimeoutFilter to deal with
// HTTP requests and responses.
class HttpTimeoutFilter(timeout: Duration)
extends TimeoutFilter[HttpRequest, HttpResponse](timeout,
new GlobalRequestTimeoutException(timeout)) {
}
// Invoke the correct underlying service based on the incoming url.
class RestService extends Service[HttpRequest, HttpResponse] {
// Simple service that returns some json.
val okService = new SimpleService()
// Simple service that waits for longer that the configured
// timeout for the server.
val timeoutService = new SimpleService(Some(1200))
// This pool is used to convert HttpResponse to
// Future[HttpResponse] by executing the service on
// a separate thread.
val futurePool = FuturePool(Executors.newFixedThreadPool(4))
// Used to split URIs.
val splitter = Splitter.on('/').omitEmptyStrings()
def apply(req: HttpRequest) = {
val path = splitter.split(req.getUri).iterator()
// Match the pieces of the path
if (path.hasNext) {
path.next() match {
// Path starts with /ok so use ok service.
// Using futurePool creates a Future that is evaluated on a separate thread using
// the thread pool passed to the FuturePool constructor above.
case "ok" => futurePool {
// See Responses.OK comments below.
Responses.OK(req,
(req: HttpRequest) => { okService(req.getUri) })}
// Path starts with /timeout so use timeout service.
case "timeout" => futurePool {
Responses.OK(req,
(req: HttpRequest) => { timeoutService(req.getUri) })}
// No match so return a not found response. Future.value
// creates a Future with an existing constant value.
case _ => Future.value(Responses.NotFound())
}
} else {
// No match so not found.
Future.value(Responses.NotFound())
}
}
}
// A very simple service that returns a model object.
class SimpleService(val waitFor: Option[Int] = None) {
def apply(name: String) = {
// If a time to wait is supplied then wait. This
// is used to timeout the service.
waitFor.foreach(t => this.synchronized { wait(t) })
// Return the model object.
new SimpleModel(UUID.randomUUID().toString, name, "Some Street")
}
}
// Objects to produce some standard http responses.
object Responses {
// Used to convert objects into json
val mapper = new ObjectMapper
// Create an HttpResponse from a status and some content.
private def respond(status: HttpResponseStatus, content: ChannelBuffer): HttpResponse = {
val response = new DefaultHttpResponse(HTTP_1_1, status)
response.setHeader("Content-Type", "application/json")
response.setHeader("Cache-Control", "no-cache")
response.setContent(content)
response
}
object OK {
def apply(req: HttpRequest, service: (HttpRequest) => Object): HttpResponse =
respond(HttpResponseStatus.OK,
copiedBuffer(mapper.writeValueAsBytes(service(req))))
}
object NotFound {
def apply(): HttpResponse =
respond(NOT_FOUND,
copiedBuffer("{\"status\":\"NOT_FOUND\"}", UTF_8))
}
object InternalServerError {
def apply(message: String): HttpResponse =
respond(INTERNAL_SERVER_ERROR,
copiedBuffer("{\"status\":\"INTERNAL_SERVER_ERROR\", " +
"\"message\":\"" + message + "\"}", UTF_8))
}
}
@ravindranathakila
Copy link

Any chance I can see a Java version of filters? Even a simple version would be much appreciated. I'm struggling to hookup the service with andThen to the filter.

@ravindranathakila
Copy link

Found it, so

new MyFilter().andThen(serviceToBeWrapped)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment