Last active
December 22, 2015 00:29
-
-
Save jbrisbin/6390129 to your computer and use it in GitHub Desktop.
Pass data from a Processor to an IO-intensive task in a Reactor backed by a ThreadPoolExecutorDispatcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Create a Reactor Environment (1 per JVM). | |
static final Environment env = new Environment(); | |
// Create a Reactor to handle IO events that's backed by a ThreadPoolExecutorDispatcher. | |
// To increase concurrent thread count, copy META-INF/reactor/default.properties from | |
// GitHub repo and put it into your own project, adjusting number of threads from 0 (1 per CPU) | |
// to whatever you want. | |
Reactor ioReactor = Reactors.reactor() | |
.env(env) | |
.dispatcher(Environment.THREAD_POOL) | |
.roundRobinEventRouting() | |
.get(); | |
// Assign multiple, load-balanced handlers for write events. | |
for(int i = 0; i < maxWriteHandlers; i++) { | |
ioReactor.on($("write.data"), new Consumer<Event<Buffer>>() { | |
public void accept(Event<Buffer> ev) { Buffer buff = ev.getData(); /* write Buffer data */ } | |
}); | |
} | |
// Create a Processor for high-speed processing using a RingBuffer | |
Processor<Buffer> proc = new ProcessorSpec<Buffer>() | |
.dataSupplier(new Supplier<Buffer>() { | |
public Buffer get() { return new Buffer(256, true); /* create fixed-size Buffers */ } | |
}) | |
// Pass all events to IO Reactor | |
.consume(new Consumer<Buffer>() { | |
public void accept(Buffer buffer) { | |
// We could also check prefixes, decide to use different Reactors | |
// or notify different topics. There's only one in this example, | |
// so everything goes through it. | |
ioReactor.notify("write.data", Event.wrap(buffer.copy())); | |
} | |
}) | |
.get(); | |
// Batch requests for highest throughput. Batch sizes of 256-1024 are usually fastest. | |
// Any larger and you start to lose efficiency. But YMMV. | |
proc.batch(512, new Consumer<Buffer>() { | |
// Get next chunk of data and re-use pre-allocated Buffer | |
public void accept(Buffer buff) { buff.clear().append(nextChunk()).flip(); } | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment