Skip to content

Instantly share code, notes, and snippets.

@eranharel
Created July 29, 2013 10:12
Show Gist options
  • Save eranharel/6103369 to your computer and use it in GitHub Desktop.
Save eranharel/6103369 to your computer and use it in GitHub Desktop.
Netty Graphite Client
package com.outbrain.gruffalo.publish;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* @author Eran Harel
*/
class GraphiteMetricsPublisher implements MetricsPublisher {
private static final Logger log = LoggerFactory.getLogger(GraphiteHandler.class);
private static final int RECONNECT_DELAY_SEC = 5;
private final String graphiteHost;
private final int graphitePort;
private final EventLoopGroup eventLoopGroup;
private final ChannelHandler graphiteHandler = new GraphiteHandler();
private final GraphiteMetricsPublisher.GraphiteClientChannelInitializer channelInitializer = new GraphiteClientChannelInitializer();
private Channel channel;
public GraphiteMetricsPublisher(final String graphiteHost, final int graphitePort, final EventLoopGroup eventLoopGroup) {
this.graphiteHost = graphiteHost;
this.graphitePort = graphitePort;
this.eventLoopGroup = eventLoopGroup;
}
private Bootstrap configureBootstrap(EventLoopGroup eventLoopGroup) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.remoteAddress(graphiteHost, graphitePort);
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(channelInitializer);
return bootstrap;
}
public void connect() {
channel = configureBootstrap(eventLoopGroup).connect().channel();
}
@Override
public void publishMetrics(String metrics) {
channel.writeAndFlush(metrics);
}
private class GraphiteClientChannelInitializer extends ChannelInitializer<Channel> {
private final StringDecoder DECODER = new StringDecoder();
private final StringEncoder ENCODER = new StringEncoder();
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", DECODER);
pipeline.addLast("encoder", ENCODER); // we don't really read responses...
pipeline.addLast("handler", graphiteHandler);
}
}
@ChannelHandler.Sharable
private class GraphiteHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.warn("Got an unexpected downstream message: " + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Connected to: {}", ctx.channel().remoteAddress());
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.warn("Got disconnected... will try to reconnect in {} sec...", RECONNECT_DELAY_SEC);
final EventLoop loop = ctx.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
log.info("Reconnecting to {}:{}", graphiteHost, graphitePort);
connect();
}
}, RECONNECT_DELAY_SEC, TimeUnit.SECONDS);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("Unexpected exception from downstream.", cause);
ctx.close();
}
}
}
@normanmaurer
Copy link

StringEncoder and StringDecoder instances can be static and so shared.

@normanmaurer
Copy link

GraphiteHandler instance can be static and so shared

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment