Pull request #1615 is intended to add SockJS support to Netty. It does so by adding an API on top of Netty that end users have to implement. The interface that users had to implement looks like this:
/**
* Represents the server side business application server in SockJS.
*/
public interface SockJsService {
/**
* The {@link SockJsConfig} for this service
*
* @return {@link SockJsConfig} this services configuration.
*/
SockJsConfig config();
/**
* Will be called when a new session is opened.
*
* @param session the {@link SockJsSessionContext} which can be stored and used for sending/closing.
*/
void onOpen(SockJsSessionContext session);
/**
* Will be called when a message is sent to the service.
*
* @param message the message sent from a client.
* @throws Exception
*/
void onMessage(String message) throws Exception;
/**
* Will be called when the session is closed.
*/
void onClose();
}
This document discusses an optional implementation where only existing Netty API are used.
The sockjs-refectoring branch is being used to experiment with this.
Instead of implementing the SockJsService interface a SockJS service will instead be a simple ChannelHandler, for example:
public class SockJsEchoHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
ctx.writeAndFlush(msg);
}
}
The following example will be used to explain some of the details and issue with implementing this.
The onOpen
method in the original SockJsService interface does not really have an equivalent in Netty due to that onOpen refers to a SockJs Session being opened. This may sometimes map to a channelActive
event but not always depending on the transport being used.
At the moment this is handled via a user event, and looks like this:
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
if (evt == Event.ON_SESSION_OPEN) {
logger.info("Connected");
}
}
While it might be most logical to handle this using Netty's close method on a ChannelHandler this concerns closing the SockJS session. Netty close method would be called for some transport, like a polling transport, more often than the SockJS session is closed. Therefore this is also handled using a user event.
On close will also be handled using user events:
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
if (evt == Event.CLOSE_SESSION) {
logger.info("Session closing");
}
}
Taken from NettySockJsServer.java
public void run() throws Exception {
final EventLoopGroup bossGroup = new NioEventLoopGroup();
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap sb = new ServerBootstrap();
sb.channel(NioSockJsServerChannel.class);
sb.group(bossGroup, workerGroup);
final CorsConfig corsConfig = DefaultSockJsConfig.defaultCorsConfig("test", "*")
.allowedRequestHeaders("a", "b", "c")
.allowNullOrigin()
.allowedRequestMethods(POST, GET, OPTIONS)
.build();
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SockJsEchoHandler());
}
});
sb.option(PREFIX, "/echo");
sb.childOption(MAX_STREAMING_BYTES_SIZE, 4096);
sb.childOption(CORS_CONFIG, corsConfig);
sb.register();
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SockJsCloseHandler());
}
});
sb.option(PREFIX, "/close");
sb.childOption(CORS_CONFIG, corsConfig);
sb.register();
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SockJsEchoHandler());
}
});
sb.option(PREFIX, "/cookie_needed_echo");
sb.childOption(COOKIES_NEEDED, true);
sb.childOption(CORS_CONFIG, corsConfig);
sb.register();
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SockJsWSDisabledHandler());
}
});
sb.option(PREFIX, "/disabled_websocket_echo");
sb.childOption(WEBSOCKET_ENABLED, false);
sb.childOption(CORS_CONFIG, corsConfig);
sb.register();
final Channel ch = sb.bind(port).sync().channel();
System.out.println("Web socket server started on port [" + port + "], ");
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
In the above example we are configuring multiple SockJS services/handlers, 4 in total named echo
, close
, cookie_needed_echo
, and disabled_websocket_echo
. The handlers in this case match those that are required by the sockjs-protocol testsuite.
Notice also that we are calling register
after having configured the services/handlers. This is where SockJS differs from a standard Netty. When a request is processed it is the prefix in the request url that determines which SockJS service that should process the request.
For example, a request url might look like /echo/123/123/xhr
, where echo
is the prefix, or service name if you prefer. So we don't know the target service/handler of the request until we have processed the HTTP request.
When register
is called NioSockJsServerChannel will not create a ServerSocketChannel which you might be used to when using the normal NioServerSocketChannel. Instead, register only creates a SockJsService for the current configuration and child handler:
@Override
protected void doRegister() throws Exception {
final String prefix = config.getPrefix();
services.putIfAbsent(prefix, new SockJsService(prefix, pipeline().removeFirst()));
}
We are removing the ChannelInitializer that was added by ServerBootstrap.init
, and associating it with a SockJsService. A SockJsService is identified by a prefix (or service name) which maps to a HTTP request path segment. This allow us to configure multiple services/handlers. SockJsMultiplexer handles all HTTP request and inspected the path and retrieves the SockJsService for that request. It will then add the ChannelInitializer to the pipeline and call fireChannelRegistered
to have the ChannelInitilizer run. Remember that this only adds a ServerBootstrapAcceptor to the pipeline and nothing more. We also need the ServerBootstrapAcceptor to be run so we also invoke fireChannelRead
. This will add the childHandler to the pipeline, set the childOptions and childAttributes on the channel. It will also register the channel. This is a problem for the current SockJS implementation. The registering has always been there and I simply did not notice that it was getting called. But after rebasing the channel will be closed if registration does not succeed (there will be an exception saying that the channel is already registered). I've added a check for this to see if the channel is registered but it is more of a hack.
public void run() throws Exception {
final EventLoopGroup bossGroup = new OioEventLoopGroup();
try {
final ServerBootstrap sb = new ServerBootstrap();
sb.channel(OioSockJsServerChannel.class);
sb.group(bossGroup);
sb.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
final CorsConfig corsConfig = DefaultSockJsServiceConfig.defaultCorsConfig("test", "*", "localhost:8081")
.allowedRequestHeaders("a", "b", "c")
.allowNullOrigin()
.allowedRequestMethods(POST, GET, OPTIONS)
.build();
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SockJsEchoHandler());
}
});
sb.option(PREFIX, "/echo");
sb.childOption(MAX_STREAMING_BYTES_SIZE, 4096);
sb.childOption(CORS_CONFIG, corsConfig);
sb.childOption(HEARTBEAT_INTERVAL, 60000L);
sb.register();
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SockJsCloseHandler());
}
});
sb.option(PREFIX, "/close");
sb.childOption(CORS_CONFIG, corsConfig);
sb.childOption(HEARTBEAT_INTERVAL, 60000L);
sb.register();
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SockJsEchoHandler());
}
});
sb.option(PREFIX, "/cookie_needed_echo");
sb.childOption(COOKIES_NEEDED, true);
sb.childOption(CORS_CONFIG, corsConfig);
sb.childOption(HEARTBEAT_INTERVAL, 60000L);
sb.register();
sb.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(new SockJsWSDisabledHandler());
}
});
sb.option(PREFIX, "/disabled_websocket_echo");
sb.childOption(WEBSOCKET_ENABLED, false);
sb.childOption(CORS_CONFIG, corsConfig);
sb.childOption(HEARTBEAT_INTERVAL, 60000L);
sb.register();
final Channel ch = sb.bind(port).sync().channel();
System.out.println("Web socket server started on port [" + port + "], ");
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
The base ChannelHandlers in SockJS, the handlers that set up HTTP/HTTPS, can be configured using an option on the server bootstrap:
sb.option(CHANNEL_INITIALIZER, new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
ch.pipeline().addLast("chucked", new HttpObjectAggregator(1048576));
ch.pipeline().addLast("custom", new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void messageReceived(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
System.out.println("Custom handler to show customization of SockJS is possible.");
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}
});
ch.pipeline().addLast("mux", new SockJsMultiplexer());
}
});
This allows custom configuration of the handlers incase the default, provided by SockJsChannelInitializer are not satisfactory.
Support OIOMake the SockJS handlers configurable.Add SSL/TLS support.
Building the sockjs-refactoring branch
There have been two changes outside of the codec-sockjs module which means that these the following two modules must be build prior to building codec-sockjs:
- Update the
echo
service to have a longer session timeout to enable enough time to perform manual curl commands:
sb.option(PREFIX, "/echo");
sb.childOption(MAX_STREAMING_BYTES_SIZE, 4096);
sb.childOption(CORS_CONFIG, corsConfig);
sb.childOption(SESSION_TIMEOUT, 120000L);
sb.childOption(HEARTBEAT_INTERVAL, 60000L);
sb.register();
- Start the server by running the following command from the
codec-sockjs
directory:
mvn exec:java
- Use xhr_polling to open a session:
curl -v http://localhost:8081/echo/123/123/xhr
- Send a message:
curl -i --header "Content-Type: application/javascript" -X POST -d '["some data"]' http://localhost:8081/echo/123/123/xhr_send
- Retreive the message using another xhr_polling request:
curl -v http://localhost:8081/echo/123/123/xhr
One of the changes when doing the refactoring was to use Netty's CorsHandler which was extracted and generalized. Doing this caused a few errors to be returned by sockjs-protocol 0.3.3 tests as a few of the tests CORS handling is not correct in my opinion. The following pull requests have been registered for this:
There was also an issue with parsing of http headers:
These two are included in this branch and can be used to run the sockjs-protocol testsuite.
- Start the Netty SockJS Server:
cd netty/codec-sockjs
mvn exec:java
- Run the sockjs-protocol testsuite:
cd sockjs-protocol
make test_deps (only required to be run once)
./venv/bin/python sockjs-protocol-0.3.3.py
@normanmaurer: I've removed the SockJsEventLoop and SockJsEventLoopGroup classes now.
Yeah, you are right, this would only work with a NioSocketChannel at the moment. Let me try to address this. Thx!