Created
January 14, 2011 07:07
-
-
Save azinman/779303 to your computer and use it in GitHub Desktop.
Testing 50k+ concurrent connections using netty
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mypackage.benchmark | |
import com.mypackage.util.Logging | |
import net.lag.configgy.Configgy | |
import scala.collection.mutable | |
import scala.collection.JavaConversions._ | |
import java.net.InetSocketAddress | |
import java.nio.charset.Charset | |
import java.util.concurrent._ | |
import java.util.concurrent.atomic.AtomicInteger | |
import org.jboss.netty.buffer.ChannelBuffer | |
import org.jboss.netty.buffer.ChannelBuffers | |
import org.jboss.netty.bootstrap.ClientBootstrap | |
import org.jboss.netty.channel.ChannelFuture | |
import org.jboss.netty.channel.ChannelPipeline | |
import org.jboss.netty.channel.ChannelPipelineFactory | |
import org.jboss.netty.channel.Channels | |
import org.jboss.netty.channel.ChannelHandlerContext | |
import org.jboss.netty.channel.ChannelFutureListener | |
import org.jboss.netty.channel.ChannelStateEvent | |
import org.jboss.netty.channel.ExceptionEvent | |
import org.jboss.netty.channel.MessageEvent | |
import org.jboss.netty.channel.SimpleChannelUpstreamHandler | |
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory | |
import org.jboss.netty.channel.group._ | |
import org.jboss.netty.handler.codec.string._ | |
import org.jboss.netty.handler.codec.frame.Delimiters | |
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder | |
import org.jboss.netty.util.HashedWheelTimer | |
import org.jboss.netty.util.TimerTask | |
import org.jboss.netty.util.Timeout | |
object BenchmarkClientMain extends Logging { | |
val bossExecutor:ExecutorService = Executors.newCachedThreadPool() | |
val workerExecutor:ExecutorService = Executors.newCachedThreadPool() | |
val channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor) | |
val bootstrap = new ClientBootstrap(channelFactory) | |
val timer = new HashedWheelTimer() | |
val failures = new AtomicInteger(0) | |
val successes = new AtomicInteger(0) | |
val cancelled = new AtomicInteger(0) | |
val failureCauses = new mutable.HashSet[String] with mutable.SynchronizedSet[String] | |
var numConns = 0 | |
val TEST_TIME_SEC = 20 | |
var startTime:Long = 0 | |
def main(args:Array[String]) : Unit = { | |
Configgy.configure("config/nettybenchmark.conf") | |
log.info("Launching Netty Client benchmark") | |
configureBootstrap | |
val config = Configgy.config | |
val hostname = config.getString("hostname").get | |
val port = config.getInt("port").get | |
numConns = config.getInt("numConns").get | |
startTime = System.currentTimeMillis | |
for (i <- 0 until numConns) { | |
val future:ChannelFuture = bootstrap.connect(new InetSocketAddress(hostname, port)) | |
future.addListener(newChannelListener) | |
} | |
log.info("Boostrapped %d connections in %d msec", numConns, System.currentTimeMillis - startTime) | |
} | |
def configureBootstrap = { | |
// Set up the pipeline factory. | |
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { | |
def getPipeline():ChannelPipeline = { | |
val pipeline = Channels.pipeline() | |
pipeline.addLast("frameDecoder", | |
new DelimiterBasedFrameDecoder(65536, true, | |
Delimiters.lineDelimiter()(0), | |
Delimiters.lineDelimiter()(1))) | |
pipeline.addLast("decoder", new StringDecoder(Charset.forName("UTF-8"))) | |
pipeline.addLast("encoder", new StringEncoder(Charset.forName("UTF-8"))) | |
pipeline.addLast("handler", new BenchmarkClientHandler()) | |
pipeline | |
} | |
}) | |
bootstrap.setOption("tcpNoDelay", true) | |
bootstrap.setOption("keepAlive", true) | |
bootstrap.setOption("connectTimeoutMillis", 10000) | |
bootstrap.setOption("client.reuseAddress", true) | |
} | |
def lastChannelConnected = { | |
log.info("Finished completing all connections:") | |
log.info("\tTook %2.2f sec total", (System.currentTimeMillis - startTime) / 1000.0) | |
log.info("\tSucesses: " + successes.get) | |
log.info("\tFailures: " + failures.get) | |
log.info("\tCancelled: " + cancelled.get) | |
if (!failureCauses.isEmpty) { | |
log.info("Failure reasons:") | |
failureCauses.foreach((failure) => log.info("\t" + failure)) | |
} | |
// Start 1 minute timer before closing everything and finishing | |
timer.start | |
log.info("Waiting " + TEST_TIME_SEC + " seconds...") | |
timer.newTimeout(endTestTimerTask, TEST_TIME_SEC, TimeUnit.SECONDS) | |
} | |
lazy val endTestTimerTask = new TimerTask() { def run(timeout:Timeout) = { | |
log.info("Client timeout reached; closing all channels") | |
// Wait until the connection is closed or the connection attempt fails. | |
BenchmarkClientHandler.channels.disconnect.addListener(new ChannelGroupFutureListener() { | |
def operationComplete(future:ChannelGroupFuture) = { | |
log.info("Releasing resources & shutting down threads") | |
// Shutdown thread pools and release resources | |
bootstrap.releaseExternalResources | |
channelFactory.releaseExternalResources | |
bossExecutor.shutdownNow | |
workerExecutor.shutdownNow | |
printStats | |
log.info("Exiting") | |
// Timer is still running, so we must System.exit | |
System.exit(0) | |
} | |
}) | |
}} | |
def printStats = { | |
val responseTimes = BenchmarkClientHandler.responseTimes.toList | |
val sum = responseTimes.reduceLeft[Long](_+_) | |
val avg = sum / responseTimes.size | |
log.info("Response times:") | |
log.info("\tAverage: " + avg + "msec") | |
log.info("\tMin: " + responseTimes.min + "msec") | |
log.info("\tMax: " + responseTimes.max + "msec") | |
} | |
lazy val newChannelListener = new ChannelFutureListener() { | |
def operationComplete(future:ChannelFuture) = { | |
assert(future.isDone) | |
if (future.isCancelled()) { | |
// Connection attempt cancelled by user | |
cancelled.incrementAndGet | |
} else if (!future.isSuccess) { | |
val reason = future.getCause().toString | |
failureCauses.add(reason) | |
failures.incrementAndGet | |
} else { | |
// Connection established successfully | |
successes.incrementAndGet | |
} | |
if (successes.get + failures.get + cancelled.get == numConns) { | |
BenchmarkClientMain.lastChannelConnected | |
} | |
} | |
} | |
} | |
object BenchmarkClientHandler { | |
val channels = new DefaultChannelGroup() | |
val responseTimes = new ConcurrentLinkedQueue[Long]() | |
} | |
class BenchmarkClientHandler extends SimpleChannelUpstreamHandler with Logging { | |
override def channelConnected(ctx:ChannelHandlerContext, e:ChannelStateEvent) = { | |
super.channelConnected(ctx, e) | |
BenchmarkClientHandler.channels.add(ctx.getChannel) | |
e.getChannel.write("Hello there!\n") | |
} | |
override def channelClosed(ctx:ChannelHandlerContext, e:ChannelStateEvent) = { | |
super.channelClosed(ctx, e) | |
BenchmarkClientHandler.channels.remove(ctx.getChannel) | |
} | |
override def messageReceived(ctx:ChannelHandlerContext, e:MessageEvent) = { | |
val received = System.currentTimeMillis | |
val msg = e.getMessage.asInstanceOf[String] | |
val sentTime = msg.toLong | |
val diff = received - sentTime | |
BenchmarkClientHandler.responseTimes.add(diff) | |
} | |
override def exceptionCaught(ctx:ChannelHandlerContext, e:ExceptionEvent) = { | |
// Close the connection when an exception is raised. | |
log.warning("Unexpected exception from downstream: %s", e.getCause()) | |
e.getChannel().close() | |
// Not necessary because channelClosed will be called? | |
// BenchmarkClientHandler.channels.remove(e.getChannel) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mypackage.benchmark | |
import com.mypackage.util.Logging | |
import net.lag.configgy.Configgy | |
import java.net.InetSocketAddress | |
import java.util.concurrent._ | |
import java.util.concurrent.atomic.AtomicLong | |
import java.nio.charset.Charset | |
import org.jboss.netty.bootstrap.ServerBootstrap | |
import org.jboss.netty.buffer.ChannelBuffer | |
import org.jboss.netty.channel.ChannelPipeline | |
import org.jboss.netty.channel.ChannelPipelineFactory | |
import org.jboss.netty.channel.Channels | |
import org.jboss.netty.channel.ChannelEvent | |
import org.jboss.netty.channel.ChannelHandlerContext | |
import org.jboss.netty.channel.ChannelStateEvent | |
import org.jboss.netty.channel.ExceptionEvent | |
import org.jboss.netty.channel.MessageEvent | |
import org.jboss.netty.channel.SimpleChannelUpstreamHandler | |
import org.jboss.netty.channel.group._ | |
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory | |
import org.jboss.netty.handler.execution.ExecutionHandler | |
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor | |
import org.jboss.netty.handler.codec.string._ | |
import org.jboss.netty.handler.codec.frame.Delimiters | |
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder | |
import org.jboss.netty.handler.timeout._ | |
import org.jboss.netty.util.HashedWheelTimer | |
object BenchmarkServerMain extends Logging { | |
// Configure the server | |
val bossExecutor:ExecutorService = Executors.newCachedThreadPool() | |
val workerExecutor:ExecutorService = Executors.newCachedThreadPool() | |
val channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor) | |
val bootstrap = new ServerBootstrap(channelFactory) | |
val timer = new HashedWheelTimer() | |
val orderedMemoryAwareThreadPoolExecutor = new OrderedMemoryAwareThreadPoolExecutor( | |
100, // core pool size | |
0, // maxChannelMemorySize, 0 to disable, | |
0 // maxTotalMemorySize, 0 to disable | |
) | |
val executionHandler = new ExecutionHandler(orderedMemoryAwareThreadPoolExecutor) | |
def main(args:Array[String]) : Unit = { | |
Configgy.configure("config/nettybenchmark.conf") | |
log.info("Launching Netty Server benchmark") | |
// Set up the pipeline factory. | |
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { | |
def getPipeline():ChannelPipeline = { | |
val pipeline = Channels.pipeline() | |
pipeline.addLast("frameDecoder", | |
new DelimiterBasedFrameDecoder(65536, true, | |
Delimiters.lineDelimiter()(0), | |
Delimiters.lineDelimiter()(1))) | |
pipeline.addLast("decoder", new StringDecoder(Charset.forName("UTF-8"))) | |
pipeline.addLast("encoder", new StringEncoder(Charset.forName("UTF-8"))) | |
pipeline.addLast("idleHandler", new IdleStateHandler(timer, 0, 5, 0)) | |
pipeline.addLast("pipelineExecuter", executionHandler) | |
pipeline.addLast("handler", new BenchmarkServerHandler()) | |
pipeline | |
} | |
}) | |
// Bind and start to accept incoming connections. | |
bootstrap.setOption("child.keepAlive", true) // for mobiles & our stateful app | |
bootstrap.setOption("child.tcpNoDelay", true) // better latency over bandwidth | |
bootstrap.setOption("reuseAddress", true) // kernel optimization | |
bootstrap.setOption("child.reuseAddress", true) // kernel optimization | |
val config = Configgy.config | |
val port = config.getInt("port").get | |
bootstrap.bind(new InetSocketAddress(port)) | |
} | |
def shutdown = { | |
BenchmarkServerHandler.channels.close().awaitUninterruptibly() | |
timer.stop() | |
bootstrap.releaseExternalResources | |
orderedMemoryAwareThreadPoolExecutor.shutdownNow | |
channelFactory.releaseExternalResources | |
bossExecutor.shutdownNow | |
workerExecutor.shutdownNow | |
} | |
} | |
object BenchmarkServerHandler { | |
val channels = new DefaultChannelGroup() | |
} | |
class BenchmarkServerHandler extends IdleStateAwareChannelUpstreamHandler with Logging { | |
override def channelIdle(ctx:ChannelHandlerContext, e:IdleStateEvent) = { | |
// send keep alive ping | |
if (e.getState == IdleState.WRITER_IDLE) { | |
val now = System.currentTimeMillis | |
ctx.getChannel.write(now + "\n") | |
} | |
} | |
override def messageReceived(ctx:ChannelHandlerContext, e:MessageEvent) = { | |
// Discard received data silently by doing nothing. | |
//transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes()) | |
val msg = e.getMessage.asInstanceOf[String] | |
} | |
override def exceptionCaught(ctx:ChannelHandlerContext, e:ExceptionEvent) = { | |
// Close the connection when an exception is raised. | |
log.warning("Unexpected exception from downstream.", e.getCause()) | |
e.getChannel().close() | |
} | |
override def channelConnected(ctx:ChannelHandlerContext, e:ChannelStateEvent) = { | |
super.channelConnected(ctx, e) | |
BenchmarkServerHandler.channels.add(ctx.getChannel) | |
} | |
override def channelClosed(ctx:ChannelHandlerContext, e:ChannelStateEvent) = { | |
super.channelClosed(ctx, e) | |
BenchmarkServerHandler.channels.remove(ctx.getChannel) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment