Created
July 18, 2014 15:46
-
-
Save bancek/c0be8a716ef3144dcd5b to your computer and use it in GitHub Desktop.
Finagle Thrift Statsd filter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.koofr.finagle.thrift.stats | |
import com.twitter.finagle.SimpleFilter | |
import org.apache.thrift.protocol.TBinaryProtocol | |
import com.twitter.finagle.Service | |
import org.apache.thrift.transport.TMemoryInputTransport | |
import play.modules.statsd.api.Statsd | |
import com.twitter.finagle.thrift.ThriftClientRequest | |
import play.api.Logger | |
// Usage: | |
// | |
// val stats = new ClientThriftStats(serviceName) | |
// val serviceWithStats = stats andThen service | |
abstract class ThriftStats[T](typ: String, serviceName: String) extends SimpleFilter[T, Array[Byte]] { | |
val factory = new TBinaryProtocol.Factory() | |
def getData(request: T): Array[Byte] | |
def apply(request: T, service: Service[T, Array[Byte]]) = { | |
val start = System.currentTimeMillis() | |
val transport = new TMemoryInputTransport(getData(request)) | |
val protocol = new TBinaryProtocol.Factory().getProtocol(transport) | |
val msg = protocol.readMessageBegin() | |
val nameLower = msg.name.toLowerCase | |
val keys = Seq( | |
s"thrift.${typ}.${serviceName}.${nameLower}.", | |
s"thrift.${typ}.${serviceName}.all.", | |
s"thrift.${typ}.all." | |
) | |
keys.map(_ + "request") foreach (Statsd.increment(_)) | |
Logger.trace(s"Thrift ${serviceName} ${typ} ${nameLower} request") | |
val resp = service(request) | |
resp respond { respTry => | |
val time = System.currentTimeMillis() - start | |
keys.map(_ + "time") foreach (Statsd.timing(_, time)) | |
keys.map(_ + "response.done") foreach (Statsd.increment(_)) | |
respTry match { | |
case com.twitter.util.Return(r) => | |
keys.map(_ + "response.success") foreach (Statsd.increment(_)) | |
Logger.trace(s"Thrift ${serviceName} ${typ} ${nameLower}: success (${time} ms)") | |
case com.twitter.util.Throw(e) => | |
keys.map(_ + "response.error") foreach (Statsd.increment(_)) | |
Logger.trace(s"Thrift ${serviceName} ${typ} ${nameLower}: error (${time} ms)") | |
} | |
} | |
resp | |
} | |
} | |
class ServerThriftStats(serviceName: String) extends ThriftStats[Array[Byte]]("server", serviceName) { | |
def getData(request: Array[Byte]) = request | |
} | |
class ClientThriftStats(serviceName: String) extends ThriftStats[ThriftClientRequest]("client", serviceName) { | |
def getData(request: ThriftClientRequest) = request.message | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment