Created
June 2, 2011 09:01
-
-
Save Arbow/1004144 to your computer and use it in GitHub Desktop.
Netty 3.2.4 idle event trigger patch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/channel/socket/DefaultSocketChannelConfig.java | |
--- a/src/main/java/org/jboss/netty/channel/socket/DefaultSocketChannelConfig.java Thu Jun 02 14:13:48 2011 +0800 | |
+++ b/src/main/java/org/jboss/netty/channel/socket/DefaultSocketChannelConfig.java Thu Jun 02 16:55:06 2011 +0800 | |
@@ -17,6 +17,7 @@ | |
import java.net.Socket; | |
import java.net.SocketException; | |
+import java.util.concurrent.TimeUnit; | |
import org.jboss.netty.channel.ChannelException; | |
import org.jboss.netty.channel.DefaultChannelConfig; | |
@@ -66,6 +67,12 @@ | |
setSoLinger(ConversionUtil.toInt(value)); | |
} else if (key.equals("trafficClass")) { | |
setTrafficClass(ConversionUtil.toInt(value)); | |
+ } else if (key.equals("readerIdleTime")) { | |
+ setReaderIdleTime(ConversionUtil.toLong(value), TimeUnit.SECONDS); | |
+ } else if (key.equals("writerIdleTime")) { | |
+ setWriterIdleTime(ConversionUtil.toLong(value), TimeUnit.SECONDS); | |
+ } else if (key.equals("allIdleTime")) { | |
+ setAllIdleTime(ConversionUtil.toLong(value), TimeUnit.SECONDS); | |
} else { | |
return false; | |
} | |
@@ -192,4 +199,34 @@ | |
throw new ChannelException(e); | |
} | |
} | |
+ | |
+ private long readerIdleTimeMillis; | |
+ | |
+ private long writerIdleTimeMillis; | |
+ | |
+ private long allIdleTimeMillis; | |
+ | |
+ public void setReaderIdleTime(long readerIdleTime, TimeUnit unit) { | |
+ readerIdleTimeMillis = unit.toMillis(readerIdleTime); | |
+ } | |
+ | |
+ public void setWriterIdleTime(long writerIdleTime, TimeUnit unit) { | |
+ writerIdleTimeMillis = unit.toMillis(writerIdleTime); | |
+ } | |
+ | |
+ public void setAllIdleTime(long allIdleTime, TimeUnit unit) { | |
+ allIdleTimeMillis = unit.toMillis(allIdleTime); | |
+ } | |
+ | |
+ public long getReaderIdleTime(TimeUnit unit) { | |
+ return unit.convert(readerIdleTimeMillis, TimeUnit.MILLISECONDS); | |
+ } | |
+ | |
+ public long getWriterIdleTime(TimeUnit unit) { | |
+ return unit.convert(writerIdleTimeMillis, TimeUnit.MILLISECONDS); | |
+ } | |
+ | |
+ public long getAllIdleTime(TimeUnit unit) { | |
+ return unit.convert(allIdleTimeMillis, TimeUnit.MILLISECONDS); | |
+ } | |
} | |
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/channel/socket/SocketChannelConfig.java | |
--- a/src/main/java/org/jboss/netty/channel/socket/SocketChannelConfig.java Thu Jun 02 14:13:48 2011 +0800 | |
+++ b/src/main/java/org/jboss/netty/channel/socket/SocketChannelConfig.java Thu Jun 02 16:55:06 2011 +0800 | |
@@ -16,8 +16,11 @@ | |
package org.jboss.netty.channel.socket; | |
import java.net.Socket; | |
+import java.util.concurrent.TimeUnit; | |
import org.jboss.netty.channel.ChannelConfig; | |
+import org.jboss.netty.handler.timeout.IdleState; | |
+import org.jboss.netty.handler.timeout.IdleStateEvent; | |
/** | |
* A {@link ChannelConfig} for a {@link SocketChannel}. | |
@@ -130,4 +133,43 @@ | |
*/ | |
void setPerformancePreferences( | |
int connectionTime, int latency, int bandwidth); | |
+ | |
+ /** | |
+ * An {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE} | |
+ * will be triggered when no read was performed for the specified period of | |
+ * time. Specify {@code 0} to disable. | |
+ * | |
+ * @param readerIdleTime | |
+ * @param unit | |
+ * the {@link TimeUnit} of {@code readerIdleTime} | |
+ */ | |
+ void setReaderIdleTime(long readerIdleTime, TimeUnit unit); | |
+ | |
+ long getReaderIdleTime(TimeUnit unit); | |
+ | |
+ /** | |
+ * An {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE} | |
+ * will be triggered when no write was performed for the specified period of | |
+ * time. Specify {@code 0} to disable. | |
+ * | |
+ * @param writerIdleTime | |
+ * @param unit | |
+ * the {@link TimeUnit} {@code writeIdleTime} | |
+ */ | |
+ void setWriterIdleTime(long writerIdleTime, TimeUnit unit); | |
+ | |
+ long getWriterIdleTime(TimeUnit unit); | |
+ | |
+ /** | |
+ * An {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE} will | |
+ * be triggered when neither read nor write was performed for the specified | |
+ * period of time. Specify {@code 0} to disable. | |
+ * | |
+ * @param allIdleTime | |
+ * @param unit | |
+ * the {@link TimeUnit} of {@code allIdleTime} | |
+ */ | |
+ void setAllIdleTime(long allIdleTime, TimeUnit unit); | |
+ | |
+ long getAllIdleTime(TimeUnit unit); | |
} | |
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingSocketChannelConfig.java | |
--- a/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingSocketChannelConfig.java Thu Jun 02 14:13:48 2011 +0800 | |
+++ b/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingSocketChannelConfig.java Thu Jun 02 16:55:06 2011 +0800 | |
@@ -17,6 +17,7 @@ | |
import java.util.Map; | |
import java.util.Map.Entry; | |
+import java.util.concurrent.TimeUnit; | |
import javax.net.ssl.SSLContext; | |
import javax.net.ssl.SSLEngine; | |
@@ -321,4 +322,28 @@ | |
public void setPipelineFactory(ChannelPipelineFactory pipelineFactory) { | |
channel.realChannel.getConfig().setPipelineFactory(pipelineFactory); | |
} | |
+ | |
+ public void setReaderIdleTime(long readerIdleTime, TimeUnit unit) { | |
+ throw new UnsupportedOperationException(); | |
+ } | |
+ | |
+ public long getReaderIdleTime(TimeUnit unit) { | |
+ throw new UnsupportedOperationException(); | |
+ } | |
+ | |
+ public void setWriterIdleTime(long writerIdleTime, TimeUnit unit) { | |
+ throw new UnsupportedOperationException(); | |
+ } | |
+ | |
+ public long getWriterIdleTime(TimeUnit unit) { | |
+ throw new UnsupportedOperationException(); | |
+ } | |
+ | |
+ public void setAllIdleTime(long allIdleTime, TimeUnit unit) { | |
+ throw new UnsupportedOperationException(); | |
+ } | |
+ | |
+ public long getAllIdleTime(TimeUnit unit) { | |
+ throw new UnsupportedOperationException(); | |
+ } | |
} | |
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java | |
--- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java Thu Jun 02 14:13:48 2011 +0800 | |
+++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java Thu Jun 02 16:55:06 2011 +0800 | |
@@ -34,6 +34,7 @@ | |
import org.jboss.netty.channel.ChannelSink; | |
import org.jboss.netty.channel.MessageEvent; | |
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; | |
+import org.jboss.netty.util.Timeout; | |
import org.jboss.netty.util.internal.LinkedTransferQueue; | |
import org.jboss.netty.util.internal.ThreadLocalBoolean; | |
@@ -74,6 +75,15 @@ | |
MessageEvent currentWriteEvent; | |
SendBuffer currentWriteBuffer; | |
+ volatile long lastWriteOperationTime; | |
+ long lastReadOperationTime; | |
+ long lastReaderIdleEventTime; // last READER_IDLE reported time | |
+ long lastWriterIdleEventTime; // last WRITER_IDLE reported time | |
+ long lastAllIdleEventTime; // last ALL_IDLE reported time | |
+ Timeout readIdleCheckTimeout; | |
+ Timeout writeIdleCheckTimeout; | |
+ Timeout allIdleCheckTimeout; | |
+ | |
public NioSocketChannel( | |
Channel parent, ChannelFactory factory, | |
ChannelPipeline pipeline, ChannelSink sink, | |
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java | |
--- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java Thu Jun 02 14:13:48 2011 +0800 | |
+++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java Thu Jun 02 16:55:06 2011 +0800 | |
@@ -32,6 +32,7 @@ | |
import java.util.Set; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.ExecutorService; | |
+import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.locks.ReadWriteLock; | |
import java.util.concurrent.locks.ReentrantReadWriteLock; | |
@@ -44,9 +45,14 @@ | |
import org.jboss.netty.channel.MessageEvent; | |
import org.jboss.netty.channel.ReceiveBufferSizePredictor; | |
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; | |
+import org.jboss.netty.handler.timeout.DefaultIdleStateEvent; | |
+import org.jboss.netty.handler.timeout.IdleState; | |
import org.jboss.netty.logging.InternalLogger; | |
import org.jboss.netty.logging.InternalLoggerFactory; | |
+import org.jboss.netty.util.HashedWheelTimer; | |
import org.jboss.netty.util.ThreadRenamingRunnable; | |
+import org.jboss.netty.util.Timeout; | |
+import org.jboss.netty.util.TimerTask; | |
import org.jboss.netty.util.internal.DeadLockProofWorker; | |
import org.jboss.netty.util.internal.LinkedTransferQueue; | |
@@ -83,10 +89,16 @@ | |
private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool(); | |
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); | |
+ private final HashedWheelTimer idleCheckTimer; | |
+ private final Queue<NioSocketChannel> idleChannels; | |
+ | |
NioWorker(int bossId, int id, Executor executor) { | |
this.bossId = bossId; | |
this.id = id; | |
this.executor = executor; | |
+ this.idleCheckTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS); | |
+ this.idleCheckTimer.start(); | |
+ this.idleChannels = new LinkedTransferQueue<NioSocketChannel>(); | |
} | |
void register(NioSocketChannel channel, ChannelFuture future) { | |
@@ -137,6 +149,8 @@ | |
started = true; | |
boolean offered = registerTaskQueue.offer(registerTask); | |
assert offered; | |
+ | |
+ registerIdleCheckTasks(channel); | |
} | |
if (wakenUp.compareAndSet(false, true)) { | |
@@ -144,6 +158,135 @@ | |
} | |
} | |
+ private void registerIdleCheckTasks(final NioSocketChannel channel) { | |
+ long readerIdleTime = channel.getConfig().getReaderIdleTime(TimeUnit.MILLISECONDS); | |
+ long writerIdleTime = channel.getConfig().getWriterIdleTime(TimeUnit.MILLISECONDS); | |
+ long allIdleTime = channel.getConfig().getAllIdleTime(TimeUnit.MILLISECONDS); | |
+ TimerTask idleCheckTask = new TimerTask() { | |
+ public void run(Timeout timeout) throws Exception { | |
+ idleChannels.offer(channel); | |
+ } | |
+ }; | |
+ if (readerIdleTime > 0) { | |
+ channel.readIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, readerIdleTime, TimeUnit.MILLISECONDS); | |
+ } | |
+ if (writerIdleTime > 0) { | |
+ channel.writeIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, writerIdleTime, TimeUnit.MILLISECONDS); | |
+ } | |
+ if (allIdleTime > 0) { | |
+ channel.allIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, allIdleTime, TimeUnit.MILLISECONDS); | |
+ } | |
+ } | |
+ | |
+ private void updateChannelReadIdleCheck(final NioSocketChannel channel) { | |
+ channel.lastReadOperationTime = System.currentTimeMillis(); | |
+ if (channel.readIdleCheckTimeout != null) { | |
+ channel.readIdleCheckTimeout.cancel(); | |
+ long readerIdleTime = channel.getConfig().getReaderIdleTime(TimeUnit.MILLISECONDS); | |
+ TimerTask idleCheckTask = new TimerTask() { | |
+ public void run(Timeout timeout) throws Exception { | |
+ idleChannels.offer(channel); | |
+ } | |
+ }; | |
+ channel.readIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, readerIdleTime, TimeUnit.MILLISECONDS); | |
+ } | |
+ updateChannelAllIdleCheck(channel); | |
+ } | |
+ | |
+ private void updateChannelWriteIdleCheck(final NioSocketChannel channel) { | |
+ channel.lastWriteOperationTime = System.currentTimeMillis(); | |
+ if (channel.writeIdleCheckTimeout != null) { | |
+ channel.writeIdleCheckTimeout.cancel(); | |
+ long writerIdleTime = channel.getConfig().getWriterIdleTime(TimeUnit.MILLISECONDS); | |
+ TimerTask idleCheckTask = new TimerTask() { | |
+ public void run(Timeout timeout) throws Exception { | |
+ idleChannels.offer(channel); | |
+ } | |
+ }; | |
+ channel.writeIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, writerIdleTime, TimeUnit.MILLISECONDS); | |
+ } | |
+ updateChannelAllIdleCheck(channel); | |
+ } | |
+ | |
+ private void updateChannelAllIdleCheck(final NioSocketChannel channel) { | |
+ if (channel.allIdleCheckTimeout != null) { | |
+ channel.allIdleCheckTimeout.cancel(); | |
+ long allIdleTime = channel.getConfig().getAllIdleTime(TimeUnit.MILLISECONDS); | |
+ TimerTask idleCheckTask = new TimerTask() { | |
+ public void run(Timeout timeout) throws Exception { | |
+ idleChannels.offer(channel); | |
+ } | |
+ }; | |
+ channel.allIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, allIdleTime, TimeUnit.MILLISECONDS); | |
+ } | |
+ } | |
+ | |
+ private void fireChannelIdle(Channel channel, IdleState state, long lastActivityTimeMillis) { | |
+ channel.getPipeline().sendUpstream(new DefaultIdleStateEvent(channel, state, lastActivityTimeMillis)); | |
+ } | |
+ | |
+ private boolean checkIdleState(Channel channel, IdleState idleState, long idleTimeout, long currentTime, | |
+ long lastIoTime) { | |
+ if (idleTimeout > 0) { | |
+ long delta = currentTime - lastIoTime; | |
+ | |
+ if (delta >= idleTimeout) { | |
+ return true; | |
+ } | |
+ } | |
+ | |
+ return false; | |
+ } | |
+ | |
+ private void processIdleChannels() { | |
+ if (!idleChannels.isEmpty()) { | |
+ for (final NioSocketChannel channel:idleChannels) { | |
+ long currentTime = System.currentTimeMillis(); | |
+ if (!channel.isConnected()) { | |
+ idleChannels.remove(channel); | |
+ continue; | |
+ } | |
+ | |
+ //check if really idle again | |
+ long readerIdleTime = channel.getConfig().getReaderIdleTime(TimeUnit.MILLISECONDS); | |
+ long writerIdleTime = channel.getConfig().getWriterIdleTime(TimeUnit.MILLISECONDS); | |
+ long allIdleTime = channel.getConfig().getAllIdleTime(TimeUnit.MILLISECONDS); | |
+ long maxReadIdleTime = Math.max(readerIdleTime, allIdleTime); | |
+ long maxWriteIdleTime = Math.max(writerIdleTime, allIdleTime); | |
+ final TimerTask idleCheckTask = new TimerTask() { | |
+ public void run(Timeout timeout) throws Exception { | |
+ idleChannels.offer(channel); | |
+ } | |
+ }; | |
+ | |
+ if (checkIdleState(channel, IdleState.READER_IDLE, readerIdleTime, currentTime, | |
+ Math.max(channel.lastReadOperationTime, channel.lastReaderIdleEventTime))) { | |
+ channel.lastReaderIdleEventTime = currentTime; | |
+ channel.readIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, maxReadIdleTime, TimeUnit.MILLISECONDS); | |
+ fireChannelIdle(channel, IdleState.READER_IDLE, channel.lastReadOperationTime); | |
+ } | |
+ | |
+ if (checkIdleState(channel, IdleState.WRITER_IDLE, writerIdleTime, currentTime, | |
+ Math.max(channel.lastWriteOperationTime, channel.lastWriterIdleEventTime))) { | |
+ channel.lastWriterIdleEventTime = currentTime; | |
+ channel.writeIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, maxWriteIdleTime, TimeUnit.MILLISECONDS); | |
+ fireChannelIdle(channel, IdleState.WRITER_IDLE, channel.lastWriteOperationTime); | |
+ } | |
+ | |
+ long lastIoTime = Math.max(channel.lastReadOperationTime, channel.lastWriteOperationTime); | |
+ if (checkIdleState(channel, IdleState.ALL_IDLE, allIdleTime, currentTime, | |
+ Math.max(lastIoTime, channel.lastAllIdleEventTime))) { | |
+ channel.lastAllIdleEventTime = currentTime; | |
+ channel.readIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, maxReadIdleTime, TimeUnit.MILLISECONDS); | |
+ channel.writeIdleCheckTimeout = idleCheckTimer.newTimeout(idleCheckTask, maxWriteIdleTime, TimeUnit.MILLISECONDS); | |
+ fireChannelIdle(channel, IdleState.ALL_IDLE, lastIoTime); | |
+ } | |
+ | |
+ idleChannels.remove(channel); | |
+ } | |
+ } | |
+ } | |
+ | |
public void run() { | |
thread = Thread.currentThread(); | |
@@ -198,13 +341,17 @@ | |
processRegisterTaskQueue(); | |
processWriteTaskQueue(); | |
processSelectedKeys(selector.selectedKeys()); | |
+ | |
+ Set<SelectionKey> keys = selector.keys(); | |
+ | |
+ processIdleChannels(); | |
// Exit the loop when there's nothing to handle. | |
// The shutdown flag is used to delay the shutdown of this | |
// loop to avoid excessive Selector creation when | |
// connections are registered in a one-by-one manner instead of | |
// concurrent manner. | |
- if (selector.keys().isEmpty()) { | |
+ if (keys.isEmpty()) { | |
if (shutdown || | |
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { | |
@@ -244,6 +391,7 @@ | |
} | |
} | |
} | |
+ this.idleCheckTimer.stop(); | |
} | |
private void processRegisterTaskQueue() throws IOException { | |
@@ -347,6 +495,7 @@ | |
// Fire the event. | |
fireMessageReceived(channel, buffer); | |
+ updateChannelReadIdleCheck(channel); | |
} else { | |
recvBufferPool.release(bb); | |
} | |
@@ -475,6 +624,8 @@ | |
break; | |
} | |
} | |
+ | |
+ updateChannelWriteIdleCheck(channel); | |
if (buf.finished()) { | |
// Successful write - proceed to the next message. | |
@@ -776,6 +927,10 @@ | |
"Failed to register a socket to the selector.", e); | |
} | |
} | |
+ | |
+ // TODO where should it be placed? | |
+ channel.lastReadOperationTime = System.currentTimeMillis(); | |
+ channel.lastWriteOperationTime = System.currentTimeMillis(); | |
if (!server) { | |
if (!((NioClientSocketChannel) channel).boundManually) { | |
diff -r a0c46f1a4e8b src/main/java/org/jboss/netty/util/internal/ConversionUtil.java | |
--- a/src/main/java/org/jboss/netty/util/internal/ConversionUtil.java Thu Jun 02 14:13:48 2011 +0800 | |
+++ b/src/main/java/org/jboss/netty/util/internal/ConversionUtil.java Thu Jun 02 16:55:06 2011 +0800 | |
@@ -42,6 +42,17 @@ | |
} | |
/** | |
+ * Converts the specified object into an long. | |
+ */ | |
+ public static long toLong(Object value) { | |
+ if (value instanceof Number) { | |
+ return ((Number) value).longValue(); | |
+ } else { | |
+ return Long.parseLong(String.valueOf(value)); | |
+ } | |
+ } | |
+ | |
+ /** | |
* Converts the specified object into a boolean. | |
*/ | |
public static boolean toBoolean(Object value) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment