Skip to content

Instantly share code, notes, and snippets.

@kevinlynx
Last active December 25, 2015 10:29
Show Gist options
  • Select an option

  • Save kevinlynx/6961997 to your computer and use it in GitHub Desktop.

Select an option

Save kevinlynx/6961997 to your computer and use it in GitHub Desktop.
scala netty Discard Server sample
package netty.sample
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.buffer.ByteBuf
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelOption._
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelInitializer
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
class DiscardServerHandler extends ChannelInboundHandlerAdapter {
override def channelRead(ctx: ChannelHandlerContext, msg: Object) {
msg match {
case b: ByteBuf =>
println(b.toString(io.netty.util.CharsetUtil.US_ASCII))
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
cause.printStackTrace
ctx.close
}
}
/**
* Discards any incoming data.
*/
object DiscardServer {
def main(args: Array[String]) {
val bossGroup = new NioEventLoopGroup
val workerGroup = new NioEventLoopGroup
try {
val boot = new ServerBootstrap()
boot.group(bossGroup, workerGroup)
.channel((classOf[NioServerSocketChannel]))
.childHandler(new ChannelInitializer[SocketChannel]() {
def initChannel(ch: SocketChannel) {
ch.pipeline().addLast(new DiscardServerHandler())
}
})
.option[java.lang.Integer](ChannelOption.SO_BACKLOG, 128)
.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
// Bind and start to accept incoming connections.
val f: ChannelFuture = boot.bind(8000).sync();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully()
bossGroup.shutdownGracefully()
}
}
}
package netty.sample
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.buffer.ByteBuf
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel._
import io.netty.channel.nio._
import io.netty.channel.socket._
import io.netty.channel.socket.nio._
import io.netty.handler.codec._
class DiscardServerHandler extends ChannelInboundHandlerAdapter {
override def channelActive(ctx: ChannelHandlerContext) {
val buf = ctx.alloc().buffer(100)
buf.writeBytes("hello there".toCharArray.map(_.toByte))
ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
override def operationComplete(ch: ChannelFuture) {
//ctx.close
}
})
}
override def channelRead(ctx: ChannelHandlerContext, msg: Object) {
msg match {
case b: ByteBuf =>
val v = b.readByte
println(v.toChar)
if (v == 'q') {
ctx.close
}
else {
val res = ctx.alloc().buffer()
res.writeByte(v)
ctx.write(res)
ctx.flush()
}
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
cause.printStackTrace
ctx.close
}
}
/**
* Discards any incoming data.
*/
object DiscardServer {
def main(args: Array[String]) {
val bossGroup = new NioEventLoopGroup
val workerGroup = new NioEventLoopGroup
try {
val boot = new ServerBootstrap()
boot.group(bossGroup, workerGroup)
.channel((classOf[NioServerSocketChannel]))
.childHandler(new ChannelInitializer[SocketChannel]() {
def initChannel(ch: SocketChannel) {
ch.pipeline().addLast(new DiscardServerHandler())
}
})
.option[java.lang.Integer](ChannelOption.SO_BACKLOG, 128)
.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
// Bind and start to accept incoming connections.
val f: ChannelFuture = boot.bind(8000).sync();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully()
bossGroup.shutdownGracefully()
}
}
}
class TimeEncoder extends MessageToByteEncoder[UnixTime] {
def encode(ctx: ChannelHandlerContext, msg: UnixTime, out: ByteBuf) {
out.writeInt(msg.t)
}
}
@ppopoff
Copy link
Copy Markdown

ppopoff commented Dec 27, 2014

Please, extend SimpleChannelInboundHandler[ByteBuf] instead of ChannelInboundHandlerAdapter.

It will require from you to override channelRead0 instead of channelRead method.
def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit

That will eliminate casting, because you will accept ByteBuf despite of java.lang.Object, and the most important: it will release ByteBuf automatically (And you don't). Please take a look at channelRead method:

https://github.com/netty/netty/blob/5725e804bcfbd0c76e0f11e7911a76970eb06c8c/transport/src/main/java/io/netty/channel/SimpleChannelInboundHandler.java

They are releasing resources. Unless you do it, your discard-server may leak.

@firegurafiku
Copy link
Copy Markdown

Broken indentation on line 38 in netty-discardserver.scala, that's sad :(.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment