Skip to content

Instantly share code, notes, and snippets.

@hirthwork
Created November 12, 2014 10:26
Show Gist options
  • Save hirthwork/f0f1a6693690774c1cb0 to your computer and use it in GitHub Desktop.
Save hirthwork/f0f1a6693690774c1cb0 to your computer and use it in GitHub Desktop.
Asynchronous IOSession closing
package org.apache.http.impl.nio.reactor;
import java.io.IOException;
import java.nio.channels.ByteChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.http.nio.reactor.IOSession;
public class AsyncCloseableIOSession extends FilterIOSession {
private final AtomicBoolean closed = new AtomicBoolean();
private final Selector selector;
private final Executor executor;
private volatile SelectionKey key = null;
public AsyncCloseableIOSession(
final IOSession session,
final Selector selector,
final Executor executor)
{
super(session);
this.selector = selector;
this.executor = executor;
}
protected void beforeClose() {
final ByteChannel channel = session.channel();
if (channel instanceof SelectableChannel) {
try {
key = ((SelectableChannel) channel).register(selector, 0);
} catch (ClosedChannelException e) {
key = null;
}
}
}
protected void afterClose() {
if (key != null) {
key.cancel();
try {
if (selector.selectNow() > 0) {
synchronized (selector) {
selector.selectedKeys().clear();
}
}
} catch (IOException e) {
}
}
}
@Override
public void close() {
if (!closed.getAndSet(true)) {
session.setEventMask(0);
executor.execute(new Runnable() {
@Override
public void run() {
beforeClose();
AsyncCloseableIOSession.super.close();
afterClose();
}
});
}
}
@Override
public void shutdown() {
if (!closed.getAndSet(true)) {
session.setEventMask(0);
executor.execute(new Runnable() {
@Override
public void run() {
beforeClose();
AsyncCloseableIOSession.super.shutdown();
afterClose();
}
});
}
}
}
package org.apache.http.impl.nio;
import java.io.IOException;
import java.nio.channels.Selector;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.impl.nio.reactor.AsyncCloseableIOSession;
import org.apache.http.nio.reactor.IOSession;
public class AsyncCloseableNHttpServerConnectionFactory
extends DefaultNHttpServerConnectionFactory
{
private final AtomicLong count = new AtomicLong();
private final Executor executor;
private final Selector[] selectors;
public AsyncCloseableNHttpServerConnectionFactory(
final ConnectionConfig config,
final Executor executor,
final int fakeSelectorsCount)
throws IOException
{
super(config);
this.executor = executor;
selectors = new Selector[fakeSelectorsCount];
for (int i = 0; i < fakeSelectorsCount; ++i) {
selectors[i] = Selector.open();
}
}
@Override
public DefaultNHttpServerConnection createConnection(
final IOSession session)
{
return super.createConnection(
new AsyncCloseableIOSession(
session,
selectors[(int) (count.getAndIncrement() % selectors.length)],
executor));
}
}
package org.apache.http.impl.nio.reactor;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import org.apache.http.nio.reactor.IOSession;
import org.apache.http.nio.reactor.SessionBufferStatus;
public class FilterIOSession implements IOSession {
protected final IOSession session;
public FilterIOSession(final IOSession session) {
this.session = session;
}
@Override
public ByteChannel channel() {
return session.channel();
}
@Override
public SocketAddress getRemoteAddress() {
return session.getRemoteAddress();
}
@Override
public SocketAddress getLocalAddress() {
return session.getLocalAddress();
}
@Override
public int getEventMask() {
return session.getEventMask();
}
@Override
public void setEventMask(final int ops) {
session.setEventMask(ops);
}
@Override
public void setEvent(final int op) {
session.setEvent(op);
}
@Override
public void clearEvent(final int op) {
session.clearEvent(op);
}
@Override
public void close() {
session.close();
}
@Override
public void shutdown() {
session.shutdown();
}
@Override
public int getStatus() {
return session.getStatus();
}
@Override
public boolean isClosed() {
return session.isClosed();
}
@Override
public int getSocketTimeout() {
return session.getSocketTimeout();
}
@Override
public void setSocketTimeout(final int timeout) {
session.setSocketTimeout(timeout);
}
@Override
public void setBufferStatus(final SessionBufferStatus status) {
session.setBufferStatus(status);
}
@Override
public boolean hasBufferedInput() {
return session.hasBufferedInput();
}
@Override
public boolean hasBufferedOutput() {
return session.hasBufferedOutput();
}
@Override
public void setAttribute(final String name, final Object obj) {
session.setAttribute(name, obj);
}
@Override
public Object getAttribute(final String name) {
return session.getAttribute(name);
}
@Override
public Object removeAttribute(final String name) {
return session.removeAttribute(name);
}
@Override
public String toString() {
return session.toString();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment