Skip to content

Instantly share code, notes, and snippets.

@buchgr
Last active August 29, 2015 13:57
Show Gist options
  • Select an option

  • Save buchgr/9691824 to your computer and use it in GitHub Desktop.

Select an option

Save buchgr/9691824 to your computer and use it in GitHub Desktop.
// Welcome back! I just wrote down this code from the top of my head to express the main ideas.
// I didn't compile or run it. It's mostly pseudocode anyway.
// The problem of associating tasks to channels could be solved by
// having the developer tell us, which channel a task belongs to
// this would make filtering the tasks on migration very easy.
// The downside is that it is subject to user errors.
//
// null if it doesn't belong to any particular channel
execute(ch, new Runnable() {
@Override
public void run() {
ch.flush();
}
});
public class SingleThreadEventExecutor {
private static class ChannelTask implements Runnable {
ChannelTask(Channel ch, Runnable task) {
...
}
// returns the channel this task is associated with
Channel channel();
public void run() {
task.run();
}
}
@Override
public void execute(Channel ch, Runnable task) {
...
addTask(new ChannelTask(ch, task));
}
}
public class NioEventLoop ... {
private boolean migrateFrom;
private volatile boolean migrateTo;
// This method will be called by a user who wants to migrate channels
// to destLoop.
// maybe return a Future/Promise, not sure if that's interesting for the user tough?
// don't think so
// I would see the main use case for this method in the "pluggable algorithm to choose next event loop"
// that I am developing in the first part of my proposal
@Override
public void migrateChannels(NioEventLoop destLoop, ChannelMatcher matcher) {
if (inEventLoop())
migrateFrom = destLoop.migrateTo = true;
this.destLoop = destLoop;
this.matcher = matcher;
...
}
@Override
public void run() {
...
selector.select()
processSelectedKeys(...)
runAllTasks()
if (migrateFrom) {
// (this will be run in the source event loop, where we are migrating from)
// enqueue this in the destination event loop's executor, so that
// once the run method of the event loop is finished this will
// be executed.
// For the executor I would suggest to use a ThreadPoolExecutor,
// something like Executors.newFixedThreadPool(1)
destLoop.executor.execute(new Runnable() {
@Override
public void run() {
destLoop.migrateChannels0(this, matcher);
}
});
else if (!migrateTo) {
executor.execute(this);
}
}
// this will be executed by the event loop that we are migrating the
// channels to
private void migrateChannels0(NioEventLoop sourceLoop, ChannelMatcher matcher) {
Set<Channel> channels = ...;
for (SelectionKey key: sourceLoop.selector.keys()) {
Channel ch = key.channel();
if (matcher.matches(ch)) {
key.cancel();
ch.register(selector, interestOps, a);
channels.add(ch);
}
}
// perform merge operations for sourceLoop.taskQueue with taskQueue,
// sourceLoop.delayedTaskQueue with delayedTaskQueue and
// sourceLoop.shutdownHooks with shutdownHooks.
//
// I was thinking about adding timestamps to the tasks so that we could
// merge the two queues nicely in order, however I am not sure if thats really necessary
// since the delayedTaskQueue is separate anyway...
for (...) {
if (channels.contains(task.channel())) {
// append to taskQueue ...
}
}
// reset everything
migrateChannelsFrom = to.migrateChannelsTo = false;
sourceLoop.destLoop = null;
sourceLoop.matcher = null;
// back to normal
sourceLoop.executor.execute(sourceLoop);
executor.execute(this);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment