Created
October 22, 2012 10:00
-
-
Save kjunine/3930727 to your computer and use it in GitHub Desktop.
Web Socket Server Handler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class WebSocketServer { | |
| private final InetSocketAddress localAddress; | |
| private final ServerBootstrap bootstrap; | |
| public WebSocketServer (InetSocketAddress localAddress) { | |
| this.localAddress = localAddress; | |
| this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( | |
| Executors.newCachedThreadPool(new NamedThreadFactory( | |
| "NettyBoss")), | |
| Executors.newCachedThreadPool(new NamedThreadFactory( | |
| "NettyWorker")))); | |
| bootstrap.setPipelineFactory(new ChannelPipelineFactory() { | |
| @Override | |
| public ChannelPipeline getPipeline() throws Exception { | |
| return Channels.pipeline(new HttpRequestDecoder(), | |
| new HttpChunkAggregator(Integer.MAX_VALUE), | |
| new HttpResponseEncoder(), | |
| new WebSocketServerHandler(), | |
| new Handler()); | |
| } | |
| }); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class WebSocketServerHandler extends SimpleChannelUpstreamHandler { | |
| private static final String WEBSOCKET_PATH = "/test"; | |
| private final Log log = LogFactory.getLog(getClass()); | |
| private WebSocketServerHandshaker handshaker; | |
| @Override | |
| public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) | |
| throws Exception { | |
| Object msg = e.getMessage(); | |
| if (msg instanceof HttpRequest) { | |
| handleHttpRequest(ctx, (HttpRequest) msg); | |
| } else if (msg instanceof WebSocketFrame) { | |
| handleWebSocketFrame(ctx, (WebSocketFrame) msg, e); | |
| } else { | |
| if (log.isDebugEnabled()) { | |
| log.debug("not a HttpRequest or WebSocketFrame: " | |
| + msg.getClass()); | |
| } | |
| } | |
| } | |
| private void handleHttpRequest(ChannelHandlerContext ctx, | |
| HttpRequest request) { | |
| if (log.isDebugEnabled()) { | |
| log.debug("request: " + request); | |
| } | |
| // Allow only GET methods. | |
| if (request.getMethod() != GET) { | |
| sendHttpResponse(ctx, request, new DefaultHttpResponse(HTTP_1_1, | |
| FORBIDDEN)); | |
| return; | |
| } | |
| // Allow only WebSocket. | |
| if (!Values.UPGRADE.equalsIgnoreCase(request.getHeader(CONNECTION)) | |
| || !WEBSOCKET | |
| .equalsIgnoreCase(request.getHeader(Names.UPGRADE))) { | |
| sendHttpResponse(ctx, request, new DefaultHttpResponse(HTTP_1_1, | |
| FORBIDDEN)); | |
| return; | |
| } | |
| // Handshake | |
| WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory( | |
| getWebSocketLocation(request), null, false); | |
| handshaker = factory.newHandshaker(request); | |
| if (handshaker == null) { | |
| factory.sendUnsupportedWebSocketVersionResponse(ctx.getChannel()); | |
| } else { | |
| handshaker.handshake(ctx.getChannel(), request).addListener( | |
| WebSocketServerHandshaker.HANDSHAKE_LISTENER); | |
| } | |
| } | |
| private void handleWebSocketFrame(ChannelHandlerContext ctx, | |
| WebSocketFrame frame, MessageEvent e) { | |
| Channel channel = ctx.getChannel(); | |
| if (frame instanceof CloseWebSocketFrame) { | |
| handshaker.close(channel, (CloseWebSocketFrame) frame); | |
| return; | |
| } else if (frame instanceof PingWebSocketFrame) { | |
| channel.write(new PongWebSocketFrame(frame.getBinaryData())); | |
| return; | |
| } else if (!(frame instanceof TextWebSocketFrame)) { | |
| throw new UnsupportedOperationException(String.format( | |
| "%s frame types not supported", frame.getClass())); | |
| } | |
| ctx.sendUpstream(e); | |
| } | |
| private static void sendHttpResponse(ChannelHandlerContext ctx, | |
| HttpRequest req, HttpResponse response) { | |
| // Generate an error page if response status code is not OK (200). | |
| if (response.getStatus().getCode() != 200) { | |
| response.setContent(ChannelBuffers.copiedBuffer(response | |
| .getStatus().toString(), CharsetUtil.UTF_8)); | |
| setContentLength(response, response.getContent().readableBytes()); | |
| } | |
| // Send the response and close the connection if necessary. | |
| ChannelFuture f = ctx.getChannel().write(response); | |
| if (!isKeepAlive(req) || response.getStatus().getCode() != 200) { | |
| f.addListener(ChannelFutureListener.CLOSE); | |
| } | |
| } | |
| private static String getWebSocketLocation(HttpRequest req) { | |
| return "ws://" + req.getHeader(HOST) + WEBSOCKET_PATH; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment