Skip to content

Instantly share code, notes, and snippets.

@kjunine
Created October 22, 2012 10:00
Show Gist options
  • Select an option

  • Save kjunine/3930727 to your computer and use it in GitHub Desktop.

Select an option

Save kjunine/3930727 to your computer and use it in GitHub Desktop.
Web Socket Server Handler
public class WebSocketServer {
private final InetSocketAddress localAddress;
private final ServerBootstrap bootstrap;
public WebSocketServer (InetSocketAddress localAddress) {
this.localAddress = localAddress;
this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(new NamedThreadFactory(
"NettyBoss")),
Executors.newCachedThreadPool(new NamedThreadFactory(
"NettyWorker"))));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new HttpRequestDecoder(),
new HttpChunkAggregator(Integer.MAX_VALUE),
new HttpResponseEncoder(),
new WebSocketServerHandler(),
new Handler());
}
});
}
}
public class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
private static final String WEBSOCKET_PATH = "/test";
private final Log log = LogFactory.getLog(getClass());
private WebSocketServerHandshaker handshaker;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
Object msg = e.getMessage();
if (msg instanceof HttpRequest) {
handleHttpRequest(ctx, (HttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg, e);
} else {
if (log.isDebugEnabled()) {
log.debug("not a HttpRequest or WebSocketFrame: "
+ msg.getClass());
}
}
}
private void handleHttpRequest(ChannelHandlerContext ctx,
HttpRequest request) {
if (log.isDebugEnabled()) {
log.debug("request: " + request);
}
// Allow only GET methods.
if (request.getMethod() != GET) {
sendHttpResponse(ctx, request, new DefaultHttpResponse(HTTP_1_1,
FORBIDDEN));
return;
}
// Allow only WebSocket.
if (!Values.UPGRADE.equalsIgnoreCase(request.getHeader(CONNECTION))
|| !WEBSOCKET
.equalsIgnoreCase(request.getHeader(Names.UPGRADE))) {
sendHttpResponse(ctx, request, new DefaultHttpResponse(HTTP_1_1,
FORBIDDEN));
return;
}
// Handshake
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(request), null, false);
handshaker = factory.newHandshaker(request);
if (handshaker == null) {
factory.sendUnsupportedWebSocketVersionResponse(ctx.getChannel());
} else {
handshaker.handshake(ctx.getChannel(), request).addListener(
WebSocketServerHandshaker.HANDSHAKE_LISTENER);
}
}
private void handleWebSocketFrame(ChannelHandlerContext ctx,
WebSocketFrame frame, MessageEvent e) {
Channel channel = ctx.getChannel();
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(channel, (CloseWebSocketFrame) frame);
return;
} else if (frame instanceof PingWebSocketFrame) {
channel.write(new PongWebSocketFrame(frame.getBinaryData()));
return;
} else if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format(
"%s frame types not supported", frame.getClass()));
}
ctx.sendUpstream(e);
}
private static void sendHttpResponse(ChannelHandlerContext ctx,
HttpRequest req, HttpResponse response) {
// Generate an error page if response status code is not OK (200).
if (response.getStatus().getCode() != 200) {
response.setContent(ChannelBuffers.copiedBuffer(response
.getStatus().toString(), CharsetUtil.UTF_8));
setContentLength(response, response.getContent().readableBytes());
}
// Send the response and close the connection if necessary.
ChannelFuture f = ctx.getChannel().write(response);
if (!isKeepAlive(req) || response.getStatus().getCode() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
private static String getWebSocketLocation(HttpRequest req) {
return "ws://" + req.getHeader(HOST) + WEBSOCKET_PATH;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment