Skip to content

Instantly share code, notes, and snippets.

@keesun
Created September 20, 2012 00:51
Show Gist options
  • Save keesun/3753306 to your computer and use it in GitHub Desktop.
Save keesun/3753306 to your computer and use it in GitHub Desktop.
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.deploy.Verticle;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class Manager extends Verticle {
static int workerCount = 4;
static int batchSize = 500000;
AtomicLong replyCount = new AtomicLong(0);
@Override
public void start() throws Exception {
container.deployVerticle("Worker.java", null, workerCount, new Handler<String>() {
@Override
public void handle(String id) {
if (id != null) {
System.out.println("Deployed");
publishMessages();
}
}
});
//Then wait for replies before sending some more - this is a crude flow control
vertx.eventBus().registerHandler("bar", new Handler<Message<String>>() {
@Override
public void handle(Message<String> str) {
long count = replyCount.incrementAndGet();
if (count % workerCount == 0) {
displayRate();
publishMessages();
}
}
long last = System.currentTimeMillis();
void displayRate() {
long now = System.currentTimeMillis();
double rate = 1000 * (double)workerCount * batchSize / (now - last);
last = now;
System.out.println("Rate is " + rate + " msgs/sec");
}
});
}
private void publishMessages() {
for (int i = 0; i < batchSize; i++) {
vertx.eventBus().publish("foo", "whatever");
}
}
}
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.deploy.Verticle;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class Worker extends Verticle {
AtomicLong messageCount = new AtomicLong(0);
@Override
public void start() throws Exception {
System.out.println("Starting worker");
vertx.eventBus().registerHandler("foo", new Handler<Message<JsonObject>>() {
@Override
public void handle(Message<JsonObject> message) {
long count = messageCount.incrementAndGet();
if (count % Manager.batchSize == 0) {
vertx.eventBus().send("bar", "whatever");
}
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment