Created
September 20, 2012 00:51
-
-
Save keesun/3753306 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.vertx.java.core.Handler; | |
import org.vertx.java.core.eventbus.Message; | |
import org.vertx.java.deploy.Verticle; | |
import java.util.concurrent.atomic.AtomicLong; | |
/** | |
* @author <a href="http://tfox.org">Tim Fox</a> | |
*/ | |
public class Manager extends Verticle { | |
static int workerCount = 4; | |
static int batchSize = 500000; | |
AtomicLong replyCount = new AtomicLong(0); | |
@Override | |
public void start() throws Exception { | |
container.deployVerticle("Worker.java", null, workerCount, new Handler<String>() { | |
@Override | |
public void handle(String id) { | |
if (id != null) { | |
System.out.println("Deployed"); | |
publishMessages(); | |
} | |
} | |
}); | |
//Then wait for replies before sending some more - this is a crude flow control | |
vertx.eventBus().registerHandler("bar", new Handler<Message<String>>() { | |
@Override | |
public void handle(Message<String> str) { | |
long count = replyCount.incrementAndGet(); | |
if (count % workerCount == 0) { | |
displayRate(); | |
publishMessages(); | |
} | |
} | |
long last = System.currentTimeMillis(); | |
void displayRate() { | |
long now = System.currentTimeMillis(); | |
double rate = 1000 * (double)workerCount * batchSize / (now - last); | |
last = now; | |
System.out.println("Rate is " + rate + " msgs/sec"); | |
} | |
}); | |
} | |
private void publishMessages() { | |
for (int i = 0; i < batchSize; i++) { | |
vertx.eventBus().publish("foo", "whatever"); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.vertx.java.core.Handler; | |
import org.vertx.java.core.eventbus.Message; | |
import org.vertx.java.core.json.JsonObject; | |
import org.vertx.java.deploy.Verticle; | |
import java.util.concurrent.atomic.AtomicLong; | |
/** | |
* @author <a href="http://tfox.org">Tim Fox</a> | |
*/ | |
public class Worker extends Verticle { | |
AtomicLong messageCount = new AtomicLong(0); | |
@Override | |
public void start() throws Exception { | |
System.out.println("Starting worker"); | |
vertx.eventBus().registerHandler("foo", new Handler<Message<JsonObject>>() { | |
@Override | |
public void handle(Message<JsonObject> message) { | |
long count = messageCount.incrementAndGet(); | |
if (count % Manager.batchSize == 0) { | |
vertx.eventBus().send("bar", "whatever"); | |
} | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment