Created
November 12, 2014 10:26
-
-
Save hirthwork/f0f1a6693690774c1cb0 to your computer and use it in GitHub Desktop.
Asynchronous IOSession closing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.http.impl.nio.reactor; | |
import java.io.IOException; | |
import java.nio.channels.ByteChannel; | |
import java.nio.channels.ClosedChannelException; | |
import java.nio.channels.SelectableChannel; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import org.apache.http.nio.reactor.IOSession; | |
public class AsyncCloseableIOSession extends FilterIOSession { | |
private final AtomicBoolean closed = new AtomicBoolean(); | |
private final Selector selector; | |
private final Executor executor; | |
private volatile SelectionKey key = null; | |
public AsyncCloseableIOSession( | |
final IOSession session, | |
final Selector selector, | |
final Executor executor) | |
{ | |
super(session); | |
this.selector = selector; | |
this.executor = executor; | |
} | |
protected void beforeClose() { | |
final ByteChannel channel = session.channel(); | |
if (channel instanceof SelectableChannel) { | |
try { | |
key = ((SelectableChannel) channel).register(selector, 0); | |
} catch (ClosedChannelException e) { | |
key = null; | |
} | |
} | |
} | |
protected void afterClose() { | |
if (key != null) { | |
key.cancel(); | |
try { | |
if (selector.selectNow() > 0) { | |
synchronized (selector) { | |
selector.selectedKeys().clear(); | |
} | |
} | |
} catch (IOException e) { | |
} | |
} | |
} | |
@Override | |
public void close() { | |
if (!closed.getAndSet(true)) { | |
session.setEventMask(0); | |
executor.execute(new Runnable() { | |
@Override | |
public void run() { | |
beforeClose(); | |
AsyncCloseableIOSession.super.close(); | |
afterClose(); | |
} | |
}); | |
} | |
} | |
@Override | |
public void shutdown() { | |
if (!closed.getAndSet(true)) { | |
session.setEventMask(0); | |
executor.execute(new Runnable() { | |
@Override | |
public void run() { | |
beforeClose(); | |
AsyncCloseableIOSession.super.shutdown(); | |
afterClose(); | |
} | |
}); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.http.impl.nio; | |
import java.io.IOException; | |
import java.nio.channels.Selector; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.atomic.AtomicLong; | |
import org.apache.http.config.ConnectionConfig; | |
import org.apache.http.impl.nio.reactor.AsyncCloseableIOSession; | |
import org.apache.http.nio.reactor.IOSession; | |
public class AsyncCloseableNHttpServerConnectionFactory | |
extends DefaultNHttpServerConnectionFactory | |
{ | |
private final AtomicLong count = new AtomicLong(); | |
private final Executor executor; | |
private final Selector[] selectors; | |
public AsyncCloseableNHttpServerConnectionFactory( | |
final ConnectionConfig config, | |
final Executor executor, | |
final int fakeSelectorsCount) | |
throws IOException | |
{ | |
super(config); | |
this.executor = executor; | |
selectors = new Selector[fakeSelectorsCount]; | |
for (int i = 0; i < fakeSelectorsCount; ++i) { | |
selectors[i] = Selector.open(); | |
} | |
} | |
@Override | |
public DefaultNHttpServerConnection createConnection( | |
final IOSession session) | |
{ | |
return super.createConnection( | |
new AsyncCloseableIOSession( | |
session, | |
selectors[(int) (count.getAndIncrement() % selectors.length)], | |
executor)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.http.impl.nio.reactor; | |
import java.net.SocketAddress; | |
import java.nio.channels.ByteChannel; | |
import org.apache.http.nio.reactor.IOSession; | |
import org.apache.http.nio.reactor.SessionBufferStatus; | |
public class FilterIOSession implements IOSession { | |
protected final IOSession session; | |
public FilterIOSession(final IOSession session) { | |
this.session = session; | |
} | |
@Override | |
public ByteChannel channel() { | |
return session.channel(); | |
} | |
@Override | |
public SocketAddress getRemoteAddress() { | |
return session.getRemoteAddress(); | |
} | |
@Override | |
public SocketAddress getLocalAddress() { | |
return session.getLocalAddress(); | |
} | |
@Override | |
public int getEventMask() { | |
return session.getEventMask(); | |
} | |
@Override | |
public void setEventMask(final int ops) { | |
session.setEventMask(ops); | |
} | |
@Override | |
public void setEvent(final int op) { | |
session.setEvent(op); | |
} | |
@Override | |
public void clearEvent(final int op) { | |
session.clearEvent(op); | |
} | |
@Override | |
public void close() { | |
session.close(); | |
} | |
@Override | |
public void shutdown() { | |
session.shutdown(); | |
} | |
@Override | |
public int getStatus() { | |
return session.getStatus(); | |
} | |
@Override | |
public boolean isClosed() { | |
return session.isClosed(); | |
} | |
@Override | |
public int getSocketTimeout() { | |
return session.getSocketTimeout(); | |
} | |
@Override | |
public void setSocketTimeout(final int timeout) { | |
session.setSocketTimeout(timeout); | |
} | |
@Override | |
public void setBufferStatus(final SessionBufferStatus status) { | |
session.setBufferStatus(status); | |
} | |
@Override | |
public boolean hasBufferedInput() { | |
return session.hasBufferedInput(); | |
} | |
@Override | |
public boolean hasBufferedOutput() { | |
return session.hasBufferedOutput(); | |
} | |
@Override | |
public void setAttribute(final String name, final Object obj) { | |
session.setAttribute(name, obj); | |
} | |
@Override | |
public Object getAttribute(final String name) { | |
return session.getAttribute(name); | |
} | |
@Override | |
public Object removeAttribute(final String name) { | |
return session.removeAttribute(name); | |
} | |
@Override | |
public String toString() { | |
return session.toString(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment