Created
July 29, 2013 10:12
-
-
Save eranharel/6103369 to your computer and use it in GitHub Desktop.
Netty Graphite Client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.outbrain.gruffalo.publish; | |
import java.util.concurrent.TimeUnit; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import io.netty.bootstrap.Bootstrap; | |
import io.netty.channel.Channel; | |
import io.netty.channel.ChannelHandler; | |
import io.netty.channel.ChannelHandlerContext; | |
import io.netty.channel.ChannelInitializer; | |
import io.netty.channel.ChannelPipeline; | |
import io.netty.channel.EventLoop; | |
import io.netty.channel.EventLoopGroup; | |
import io.netty.channel.SimpleChannelInboundHandler; | |
import io.netty.channel.socket.nio.NioSocketChannel; | |
import io.netty.handler.codec.string.StringDecoder; | |
import io.netty.handler.codec.string.StringEncoder; | |
/** | |
* @author Eran Harel | |
*/ | |
class GraphiteMetricsPublisher implements MetricsPublisher { | |
private static final Logger log = LoggerFactory.getLogger(GraphiteHandler.class); | |
private static final int RECONNECT_DELAY_SEC = 5; | |
private final String graphiteHost; | |
private final int graphitePort; | |
private final EventLoopGroup eventLoopGroup; | |
private final ChannelHandler graphiteHandler = new GraphiteHandler(); | |
private final GraphiteMetricsPublisher.GraphiteClientChannelInitializer channelInitializer = new GraphiteClientChannelInitializer(); | |
private Channel channel; | |
public GraphiteMetricsPublisher(final String graphiteHost, final int graphitePort, final EventLoopGroup eventLoopGroup) { | |
this.graphiteHost = graphiteHost; | |
this.graphitePort = graphitePort; | |
this.eventLoopGroup = eventLoopGroup; | |
} | |
private Bootstrap configureBootstrap(EventLoopGroup eventLoopGroup) { | |
Bootstrap bootstrap = new Bootstrap(); | |
bootstrap.remoteAddress(graphiteHost, graphitePort); | |
bootstrap.group(eventLoopGroup); | |
bootstrap.channel(NioSocketChannel.class); | |
bootstrap.handler(channelInitializer); | |
return bootstrap; | |
} | |
public void connect() { | |
channel = configureBootstrap(eventLoopGroup).connect().channel(); | |
} | |
@Override | |
public void publishMetrics(String metrics) { | |
channel.writeAndFlush(metrics); | |
} | |
private class GraphiteClientChannelInitializer extends ChannelInitializer<Channel> { | |
private final StringDecoder DECODER = new StringDecoder(); | |
private final StringEncoder ENCODER = new StringEncoder(); | |
@Override | |
protected void initChannel(Channel channel) throws Exception { | |
ChannelPipeline pipeline = channel.pipeline(); | |
pipeline.addLast("decoder", DECODER); | |
pipeline.addLast("encoder", ENCODER); // we don't really read responses... | |
pipeline.addLast("handler", graphiteHandler); | |
} | |
} | |
@ChannelHandler.Sharable | |
private class GraphiteHandler extends SimpleChannelInboundHandler<String> { | |
@Override | |
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { | |
log.warn("Got an unexpected downstream message: " + msg); | |
} | |
@Override | |
public void channelActive(ChannelHandlerContext ctx) throws Exception { | |
log.info("Connected to: {}", ctx.channel().remoteAddress()); | |
} | |
@Override | |
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { | |
log.warn("Got disconnected... will try to reconnect in {} sec...", RECONNECT_DELAY_SEC); | |
final EventLoop loop = ctx.channel().eventLoop(); | |
loop.schedule(new Runnable() { | |
@Override | |
public void run() { | |
log.info("Reconnecting to {}:{}", graphiteHost, graphitePort); | |
connect(); | |
} | |
}, RECONNECT_DELAY_SEC, TimeUnit.SECONDS); | |
} | |
@Override | |
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | |
log.error("Unexpected exception from downstream.", cause); | |
ctx.close(); | |
} | |
} | |
} |
GraphiteHandler instance can be static and so shared
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
StringEncoder and StringDecoder instances can be static and so shared.