Skip to content

Instantly share code, notes, and snippets.

@jbrisbin
Last active December 22, 2015 00:29
Show Gist options
  • Save jbrisbin/6390129 to your computer and use it in GitHub Desktop.
Save jbrisbin/6390129 to your computer and use it in GitHub Desktop.
Pass data from a Processor to an IO-intensive task in a Reactor backed by a ThreadPoolExecutorDispatcher
// Create a Reactor Environment (1 per JVM).
static final Environment env = new Environment();
// Create a Reactor to handle IO events that's backed by a ThreadPoolExecutorDispatcher.
// To increase concurrent thread count, copy META-INF/reactor/default.properties from
// GitHub repo and put it into your own project, adjusting number of threads from 0 (1 per CPU)
// to whatever you want.
Reactor ioReactor = Reactors.reactor()
.env(env)
.dispatcher(Environment.THREAD_POOL)
.roundRobinEventRouting()
.get();
// Assign multiple, load-balanced handlers for write events.
for(int i = 0; i < maxWriteHandlers; i++) {
ioReactor.on($("write.data"), new Consumer<Event<Buffer>>() {
public void accept(Event<Buffer> ev) { Buffer buff = ev.getData(); /* write Buffer data */ }
});
}
// Create a Processor for high-speed processing using a RingBuffer
Processor<Buffer> proc = new ProcessorSpec<Buffer>()
.dataSupplier(new Supplier<Buffer>() {
public Buffer get() { return new Buffer(256, true); /* create fixed-size Buffers */ }
})
// Pass all events to IO Reactor
.consume(new Consumer<Buffer>() {
public void accept(Buffer buffer) {
// We could also check prefixes, decide to use different Reactors
// or notify different topics. There's only one in this example,
// so everything goes through it.
ioReactor.notify("write.data", Event.wrap(buffer.copy()));
}
})
.get();
// Batch requests for highest throughput. Batch sizes of 256-1024 are usually fastest.
// Any larger and you start to lose efficiency. But YMMV.
proc.batch(512, new Consumer<Buffer>() {
// Get next chunk of data and re-use pre-allocated Buffer
public void accept(Buffer buff) { buff.clear().append(nextChunk()).flip(); }
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment