Skip to content

Instantly share code, notes, and snippets.

@azinman
Created January 14, 2011 07:07
Show Gist options
  • Save azinman/779303 to your computer and use it in GitHub Desktop.
Save azinman/779303 to your computer and use it in GitHub Desktop.
Testing 50k+ concurrent connections using netty
package com.mypackage.benchmark
import com.mypackage.util.Logging
import net.lag.configgy.Configgy
import scala.collection.mutable
import scala.collection.JavaConversions._
import java.net.InetSocketAddress
import java.nio.charset.Charset
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.Channels
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelFutureListener
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.channel.SimpleChannelUpstreamHandler
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.group._
import org.jboss.netty.handler.codec.string._
import org.jboss.netty.handler.codec.frame.Delimiters
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder
import org.jboss.netty.util.HashedWheelTimer
import org.jboss.netty.util.TimerTask
import org.jboss.netty.util.Timeout
object BenchmarkClientMain extends Logging {
val bossExecutor:ExecutorService = Executors.newCachedThreadPool()
val workerExecutor:ExecutorService = Executors.newCachedThreadPool()
val channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor)
val bootstrap = new ClientBootstrap(channelFactory)
val timer = new HashedWheelTimer()
val failures = new AtomicInteger(0)
val successes = new AtomicInteger(0)
val cancelled = new AtomicInteger(0)
val failureCauses = new mutable.HashSet[String] with mutable.SynchronizedSet[String]
var numConns = 0
val TEST_TIME_SEC = 20
var startTime:Long = 0
def main(args:Array[String]) : Unit = {
Configgy.configure("config/nettybenchmark.conf")
log.info("Launching Netty Client benchmark")
configureBootstrap
val config = Configgy.config
val hostname = config.getString("hostname").get
val port = config.getInt("port").get
numConns = config.getInt("numConns").get
startTime = System.currentTimeMillis
for (i <- 0 until numConns) {
val future:ChannelFuture = bootstrap.connect(new InetSocketAddress(hostname, port))
future.addListener(newChannelListener)
}
log.info("Boostrapped %d connections in %d msec", numConns, System.currentTimeMillis - startTime)
}
def configureBootstrap = {
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
def getPipeline():ChannelPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("frameDecoder",
new DelimiterBasedFrameDecoder(65536, true,
Delimiters.lineDelimiter()(0),
Delimiters.lineDelimiter()(1)))
pipeline.addLast("decoder", new StringDecoder(Charset.forName("UTF-8")))
pipeline.addLast("encoder", new StringEncoder(Charset.forName("UTF-8")))
pipeline.addLast("handler", new BenchmarkClientHandler())
pipeline
}
})
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
bootstrap.setOption("connectTimeoutMillis", 10000)
bootstrap.setOption("client.reuseAddress", true)
}
def lastChannelConnected = {
log.info("Finished completing all connections:")
log.info("\tTook %2.2f sec total", (System.currentTimeMillis - startTime) / 1000.0)
log.info("\tSucesses: " + successes.get)
log.info("\tFailures: " + failures.get)
log.info("\tCancelled: " + cancelled.get)
if (!failureCauses.isEmpty) {
log.info("Failure reasons:")
failureCauses.foreach((failure) => log.info("\t" + failure))
}
// Start 1 minute timer before closing everything and finishing
timer.start
log.info("Waiting " + TEST_TIME_SEC + " seconds...")
timer.newTimeout(endTestTimerTask, TEST_TIME_SEC, TimeUnit.SECONDS)
}
lazy val endTestTimerTask = new TimerTask() { def run(timeout:Timeout) = {
log.info("Client timeout reached; closing all channels")
// Wait until the connection is closed or the connection attempt fails.
BenchmarkClientHandler.channels.disconnect.addListener(new ChannelGroupFutureListener() {
def operationComplete(future:ChannelGroupFuture) = {
log.info("Releasing resources & shutting down threads")
// Shutdown thread pools and release resources
bootstrap.releaseExternalResources
channelFactory.releaseExternalResources
bossExecutor.shutdownNow
workerExecutor.shutdownNow
printStats
log.info("Exiting")
// Timer is still running, so we must System.exit
System.exit(0)
}
})
}}
def printStats = {
val responseTimes = BenchmarkClientHandler.responseTimes.toList
val sum = responseTimes.reduceLeft[Long](_+_)
val avg = sum / responseTimes.size
log.info("Response times:")
log.info("\tAverage: " + avg + "msec")
log.info("\tMin: " + responseTimes.min + "msec")
log.info("\tMax: " + responseTimes.max + "msec")
}
lazy val newChannelListener = new ChannelFutureListener() {
def operationComplete(future:ChannelFuture) = {
assert(future.isDone)
if (future.isCancelled()) {
// Connection attempt cancelled by user
cancelled.incrementAndGet
} else if (!future.isSuccess) {
val reason = future.getCause().toString
failureCauses.add(reason)
failures.incrementAndGet
} else {
// Connection established successfully
successes.incrementAndGet
}
if (successes.get + failures.get + cancelled.get == numConns) {
BenchmarkClientMain.lastChannelConnected
}
}
}
}
object BenchmarkClientHandler {
val channels = new DefaultChannelGroup()
val responseTimes = new ConcurrentLinkedQueue[Long]()
}
class BenchmarkClientHandler extends SimpleChannelUpstreamHandler with Logging {
override def channelConnected(ctx:ChannelHandlerContext, e:ChannelStateEvent) = {
super.channelConnected(ctx, e)
BenchmarkClientHandler.channels.add(ctx.getChannel)
e.getChannel.write("Hello there!\n")
}
override def channelClosed(ctx:ChannelHandlerContext, e:ChannelStateEvent) = {
super.channelClosed(ctx, e)
BenchmarkClientHandler.channels.remove(ctx.getChannel)
}
override def messageReceived(ctx:ChannelHandlerContext, e:MessageEvent) = {
val received = System.currentTimeMillis
val msg = e.getMessage.asInstanceOf[String]
val sentTime = msg.toLong
val diff = received - sentTime
BenchmarkClientHandler.responseTimes.add(diff)
}
override def exceptionCaught(ctx:ChannelHandlerContext, e:ExceptionEvent) = {
// Close the connection when an exception is raised.
log.warning("Unexpected exception from downstream: %s", e.getCause())
e.getChannel().close()
// Not necessary because channelClosed will be called?
// BenchmarkClientHandler.channels.remove(e.getChannel)
}
}
package com.mypackage.benchmark
import com.mypackage.util.Logging
import net.lag.configgy.Configgy
import java.net.InetSocketAddress
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicLong
import java.nio.charset.Charset
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.Channels
import org.jboss.netty.channel.ChannelEvent
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.channel.SimpleChannelUpstreamHandler
import org.jboss.netty.channel.group._
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.execution.ExecutionHandler
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor
import org.jboss.netty.handler.codec.string._
import org.jboss.netty.handler.codec.frame.Delimiters
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder
import org.jboss.netty.handler.timeout._
import org.jboss.netty.util.HashedWheelTimer
object BenchmarkServerMain extends Logging {
// Configure the server
val bossExecutor:ExecutorService = Executors.newCachedThreadPool()
val workerExecutor:ExecutorService = Executors.newCachedThreadPool()
val channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor)
val bootstrap = new ServerBootstrap(channelFactory)
val timer = new HashedWheelTimer()
val orderedMemoryAwareThreadPoolExecutor = new OrderedMemoryAwareThreadPoolExecutor(
100, // core pool size
0, // maxChannelMemorySize, 0 to disable,
0 // maxTotalMemorySize, 0 to disable
)
val executionHandler = new ExecutionHandler(orderedMemoryAwareThreadPoolExecutor)
def main(args:Array[String]) : Unit = {
Configgy.configure("config/nettybenchmark.conf")
log.info("Launching Netty Server benchmark")
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
def getPipeline():ChannelPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("frameDecoder",
new DelimiterBasedFrameDecoder(65536, true,
Delimiters.lineDelimiter()(0),
Delimiters.lineDelimiter()(1)))
pipeline.addLast("decoder", new StringDecoder(Charset.forName("UTF-8")))
pipeline.addLast("encoder", new StringEncoder(Charset.forName("UTF-8")))
pipeline.addLast("idleHandler", new IdleStateHandler(timer, 0, 5, 0))
pipeline.addLast("pipelineExecuter", executionHandler)
pipeline.addLast("handler", new BenchmarkServerHandler())
pipeline
}
})
// Bind and start to accept incoming connections.
bootstrap.setOption("child.keepAlive", true) // for mobiles & our stateful app
bootstrap.setOption("child.tcpNoDelay", true) // better latency over bandwidth
bootstrap.setOption("reuseAddress", true) // kernel optimization
bootstrap.setOption("child.reuseAddress", true) // kernel optimization
val config = Configgy.config
val port = config.getInt("port").get
bootstrap.bind(new InetSocketAddress(port))
}
def shutdown = {
BenchmarkServerHandler.channels.close().awaitUninterruptibly()
timer.stop()
bootstrap.releaseExternalResources
orderedMemoryAwareThreadPoolExecutor.shutdownNow
channelFactory.releaseExternalResources
bossExecutor.shutdownNow
workerExecutor.shutdownNow
}
}
object BenchmarkServerHandler {
val channels = new DefaultChannelGroup()
}
class BenchmarkServerHandler extends IdleStateAwareChannelUpstreamHandler with Logging {
override def channelIdle(ctx:ChannelHandlerContext, e:IdleStateEvent) = {
// send keep alive ping
if (e.getState == IdleState.WRITER_IDLE) {
val now = System.currentTimeMillis
ctx.getChannel.write(now + "\n")
}
}
override def messageReceived(ctx:ChannelHandlerContext, e:MessageEvent) = {
// Discard received data silently by doing nothing.
//transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes())
val msg = e.getMessage.asInstanceOf[String]
}
override def exceptionCaught(ctx:ChannelHandlerContext, e:ExceptionEvent) = {
// Close the connection when an exception is raised.
log.warning("Unexpected exception from downstream.", e.getCause())
e.getChannel().close()
}
override def channelConnected(ctx:ChannelHandlerContext, e:ChannelStateEvent) = {
super.channelConnected(ctx, e)
BenchmarkServerHandler.channels.add(ctx.getChannel)
}
override def channelClosed(ctx:ChannelHandlerContext, e:ChannelStateEvent) = {
super.channelClosed(ctx, e)
BenchmarkServerHandler.channels.remove(ctx.getChannel)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment