Created
March 12, 2012 14:30
-
-
Save vastdevblog/2022320 to your computer and use it in GitHub Desktop.
Example of a Finagle server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.vast.example | |
import java.net.InetSocketAddress | |
import java.util.UUID | |
import java.util.concurrent.{Executors, TimeUnit} | |
import com.google.common.base.Splitter | |
import com.twitter.finagle.http.Http | |
import com.twitter.finagle.builder.{Server, ServerBuilder} | |
import com.twitter.finagle.service.TimeoutFilter | |
import com.twitter.finagle.{Service, SimpleFilter, GlobalRequestTimeoutException} | |
import com.twitter.util.{Future, FuturePool, FutureTransformer, Duration} | |
import org.codehaus.jackson.map.ObjectMapper | |
import org.jboss.netty.buffer.ChannelBuffers.copiedBuffer | |
import org.jboss.netty.handler.codec.http.{HttpResponseStatus, DefaultHttpResponse, HttpRequest, HttpResponse} | |
import org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1 | |
import org.jboss.netty.handler.codec.http.HttpResponseStatus.{NOT_FOUND, INTERNAL_SERVER_ERROR} | |
import org.jboss.netty.util.CharsetUtil.UTF_8 | |
import org.jboss.netty.buffer.ChannelBuffer | |
/** | |
* Server entry point. | |
* | |
* @author Alex Moffat ([email protected]) | |
*/ | |
object ExampleServer { | |
def main(args: Array[String]) { | |
// Requests pass down through the filters to the service. The | |
// response from the service is passed back up through the filters | |
// to be returned. The Filter class provides the "andThen" method | |
// for creating the filter chain. | |
val service = | |
new ExceptionFilter andThen | |
new HeaderFilter andThen | |
new HttpTimeoutFilter(Duration(1, TimeUnit.SECONDS)) andThen | |
new RestService() | |
// Create the server and start it. The ServerBuilder is an example | |
// of the Type-Safe Builder pattern in scala. Options are set on | |
// the builder. Build is called to construct and start the server. | |
val server: Server = ServerBuilder() | |
.codec(Http()) | |
.bindTo(new InetSocketAddress(8080)) | |
.name("httpserver") | |
.build(service) | |
} | |
} | |
// Handle any uncaught exceptions that occur in filters or services | |
// lower down the chain. We extend SimpleFilter because we're not | |
// going to change the type of the request or response. | |
class ExceptionFilter extends SimpleFilter[HttpRequest, HttpResponse] { | |
// Deal with successful and error responses. | |
val transformer = | |
new FutureTransformer[HttpResponse, HttpResponse] { | |
// A successful response is just passed through. | |
override def map(value: HttpResponse): HttpResponse = | |
value | |
// An error is converted into a 500 response. The | |
// Responses object is in this file. | |
override def handle(throwable: Throwable): HttpResponse = | |
Responses.InternalServerError(throwable.getMessage) | |
} | |
// Apply method is where the filtering takes place. | |
def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = { | |
// Use transformedBy method to deal with both the normal and error responses. | |
service(request).transformedBy(transformer) | |
} | |
} | |
// An trial filter that adds a header to the request and another | |
// to the response. | |
class HeaderFilter extends SimpleFilter[HttpRequest, HttpResponse] { | |
def apply(request: HttpRequest, service: Service[HttpRequest, HttpResponse]) = { | |
// Look for a X-Request-ID header and add one if missing. | |
val id = request.getHeader("X-Request-ID") | |
if (id == null) { | |
request.setHeader("X-Request-ID", | |
UUID.randomUUID().toString) | |
} | |
// Add X-Processes to the response. | |
service(request).onSuccess(r => { | |
r.addHeader("X-Processed", "TRUE") | |
}) | |
} | |
} | |
// Specialize the general TimeoutFilter to deal with | |
// HTTP requests and responses. | |
class HttpTimeoutFilter(timeout: Duration) | |
extends TimeoutFilter[HttpRequest, HttpResponse](timeout, | |
new GlobalRequestTimeoutException(timeout)) { | |
} | |
// Invoke the correct underlying service based on the incoming url. | |
class RestService extends Service[HttpRequest, HttpResponse] { | |
// Simple service that returns some json. | |
val okService = new SimpleService() | |
// Simple service that waits for longer that the configured | |
// timeout for the server. | |
val timeoutService = new SimpleService(Some(1200)) | |
// This pool is used to convert HttpResponse to | |
// Future[HttpResponse] by executing the service on | |
// a separate thread. | |
val futurePool = FuturePool(Executors.newFixedThreadPool(4)) | |
// Used to split URIs. | |
val splitter = Splitter.on('/').omitEmptyStrings() | |
def apply(req: HttpRequest) = { | |
val path = splitter.split(req.getUri).iterator() | |
// Match the pieces of the path | |
if (path.hasNext) { | |
path.next() match { | |
// Path starts with /ok so use ok service. | |
// Using futurePool creates a Future that is evaluated on a separate thread using | |
// the thread pool passed to the FuturePool constructor above. | |
case "ok" => futurePool { | |
// See Responses.OK comments below. | |
Responses.OK(req, | |
(req: HttpRequest) => { okService(req.getUri) })} | |
// Path starts with /timeout so use timeout service. | |
case "timeout" => futurePool { | |
Responses.OK(req, | |
(req: HttpRequest) => { timeoutService(req.getUri) })} | |
// No match so return a not found response. Future.value | |
// creates a Future with an existing constant value. | |
case _ => Future.value(Responses.NotFound()) | |
} | |
} else { | |
// No match so not found. | |
Future.value(Responses.NotFound()) | |
} | |
} | |
} | |
// A very simple service that returns a model object. | |
class SimpleService(val waitFor: Option[Int] = None) { | |
def apply(name: String) = { | |
// If a time to wait is supplied then wait. This | |
// is used to timeout the service. | |
waitFor.foreach(t => this.synchronized { wait(t) }) | |
// Return the model object. | |
new SimpleModel(UUID.randomUUID().toString, name, "Some Street") | |
} | |
} | |
// Objects to produce some standard http responses. | |
object Responses { | |
// Used to convert objects into json | |
val mapper = new ObjectMapper | |
// Create an HttpResponse from a status and some content. | |
private def respond(status: HttpResponseStatus, content: ChannelBuffer): HttpResponse = { | |
val response = new DefaultHttpResponse(HTTP_1_1, status) | |
response.setHeader("Content-Type", "application/json") | |
response.setHeader("Cache-Control", "no-cache") | |
response.setContent(content) | |
response | |
} | |
object OK { | |
def apply(req: HttpRequest, service: (HttpRequest) => Object): HttpResponse = | |
respond(HttpResponseStatus.OK, | |
copiedBuffer(mapper.writeValueAsBytes(service(req)))) | |
} | |
object NotFound { | |
def apply(): HttpResponse = | |
respond(NOT_FOUND, | |
copiedBuffer("{\"status\":\"NOT_FOUND\"}", UTF_8)) | |
} | |
object InternalServerError { | |
def apply(message: String): HttpResponse = | |
respond(INTERNAL_SERVER_ERROR, | |
copiedBuffer("{\"status\":\"INTERNAL_SERVER_ERROR\", " + | |
"\"message\":\"" + message + "\"}", UTF_8)) | |
} | |
} |
Found it, so
new MyFilter().andThen(serviceToBeWrapped)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Any chance I can see a Java version of filters? Even a simple version would be much appreciated. I'm struggling to hookup the service with andThen to the filter.