Skip to content

Instantly share code, notes, and snippets.

@olauzon
Created April 2, 2014 20:18
Show Gist options
  • Save olauzon/9942256 to your computer and use it in GitHub Desktop.
Save olauzon/9942256 to your computer and use it in GitHub Desktop.
Port Netty in Action examples from Java to Clojure
(ns clj-tcp.netty.async.client
(:require [clj-tcp.netty.async.client-handler :as handler])
(:import [java.net InetSocketAddress]
[io.netty.bootstrap Bootstrap]
[io.netty.channel ChannelFuture
ChannelHandler
ChannelInitializer]
[io.netty.channel.nio NioEventLoopGroup]
[io.netty.channel.socket SocketChannel]
[io.netty.channel.socket.nio NioSocketChannel]))
(defn start!
[^String host ^Integer port channel-handler]
(let [group ^EventLoopGroup (NioEventLoopGroup.)
b ^Bootstrap (Bootstrap.)]
(try
(-> b
(.group group)
(.channel io.netty.channel.socket.nio.NioSocketChannel)
(.remoteAddress (InetSocketAddress. host port))
(.handler
(proxy [ChannelInitializer] []
(initChannel
[^SocketChannel ch]
(-> ch
(.pipeline)
(.addLast (into-array ChannelHandler [channel-handler])))))))
(let [channel-future ^ChannelFuture (-> b (.connect) (.sync))]
(-> channel-future
(.channel)
(.closeFuture)
(.sync)))
(finally
(-> group
(.shutdownGracefully)
(.sync))))))
(defn -main
[host port]
(start! host port (handler/make)))
(ns clj-tcp.netty.async.client-handler
(:import [io.netty.buffer ByteBuf
ByteBufUtil
Unpooled]
[io.netty.channel ChannelHandlerContext
SimpleChannelInboundHandler]
[io.netty.util CharsetUtil]))
(defn make
[]
(proxy [SimpleChannelInboundHandler] []
(channelActive
[^ChannelHandlerContext ctx]
(.writeAndFlush
ctx
(Unpooled/copiedBuffer "Netty rocks!" CharsetUtil/UTF_8)))
(channelRead0
[^ChannelHandlerContext ctx ^ByteBuf in]
(println (ByteBufUtil/hexDump in)))
(exceptionCaught
[^ChannelHandlerContext ctx ^Throwable cause]
(.printStackTrace cause)
(.close ctx))))
package com.manning.nettyinaction.chapter2;
import java.net.InetSocketAddress;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Listing 2.5 of <i>Netty in Action</i>
*
* @author <a href="mailto:[email protected]">Norman Maurer</a>
*/
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println(
"Usage: " + EchoClient.class.getSimpleName() +
" <host> <port>");
return;
}
final String host = args[0];
final int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}
package com.manning.nettyinaction.chapter2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* Listing 2.6 of <i>Netty in Action</i>
*
* @author <a href="mailto:[email protected]">Norman Maurer</a>
*/
@Sharable
public class EchoClientHandler extends
SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
@Override
public void channelRead0(ChannelHandlerContext ctx,
ByteBuf in) {
System.out.println("Client received: " + ByteBufUtil
.hexDump(in));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package com.manning.nettyinaction.chapter2;
import java.net.InetSocketAddress;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Listing 2.3 of <i>Netty in Action</i>
*
* @author <a href="mailto:[email protected]">Norman Maurer</a>
*/
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new EchoServerHandler());
}
});
ChannelFuture f = b.bind().sync();
System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress());
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println(
"Usage: " + EchoServer.class.getSimpleName() +
" <port>");
return;
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}
}
package com.manning.nettyinaction.chapter2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Listing 2.4 of <i>Netty in Action</i>
*
* @author <a href="mailto:[email protected]">Norman Maurer</a>
*/
@Sharable
public class EchoServerHandler extends
ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received: " + ByteBufUtil
.hexDump(in));
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
;; https://gist.github.com/vadali/5142879
;; http://blog.quiptiq.com/2013/05/20/porting-the-netty-discardserver-example-to-clojure/
(ns clj-tcp.netty.async.server
(:require [clojure.pprint :refer [pprint]]
[clj-tcp.netty.async.server-handler :as handler])
(:import [java.net InetSocketAddress]
[io.netty.bootstrap ServerBootstrap]
[io.netty.channel ChannelFuture
ChannelHandler
ChannelInitializer]
[io.netty.channel.nio NioEventLoopGroup]
[io.netty.channel.socket SocketChannel]
[io.netty.channel.socket.nio NioServerSocketChannel]))
(defn start!
[port channel-handler]
(let [group (NioEventLoopGroup.)
b (ServerBootstrap.)]
(try
(-> b
(.group group)
(.channel io.netty.channel.socket.nio.NioServerSocketChannel)
(.localAddress (InetSocketAddress. ^Integer port))
(.childHandler
(proxy [ChannelInitializer] []
(initChannel
[^SocketChannel ch]
(-> ch
(.pipeline)
(.addLast (into-array ChannelHandler [channel-handler])))))))
(let [channel-future ^ChannelFuture (-> b (.bind) (.sync))]
(-> channel-future
(.channel)
(.closeFuture)
(.sync)))
(finally
(-> group
(.shutdownGracefully)
(.sync))))))
(defn -main
[]
(start! 8080 (handler/make)))
(ns clj-tcp.netty.async.server-handler
(:require [clojure.pprint :refer [pprint]])
(:import [io.netty.buffer ByteBuf
ByteBufUtil
Unpooled]
[io.netty.channel ChannelFutureListener
ChannelHandlerContext
ChannelInboundHandlerAdapter]))
(defn make
[]
(proxy [ChannelInboundHandlerAdapter] []
(channelRead
[^ChannelHandlerContext ctx ^Object msg]
(println msg)
(.write ctx msg))
(channelReadComplete
[^ChannelHandlerContext ctx]
(.flush ctx))
(exceptionCaught
[^ChannelHandlerContext ctx ^Throwable cause]
(.printStackTrace cause)
(.close ctx))
(isSharable [] true)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment