Created
February 22, 2011 03:43
-
-
Save amacleod/838177 to your computer and use it in GitHub Desktop.
Java translation of the rtdealer example from http://zguide.zeromq.org/chapter:all#toc45
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Custom routing Router to Dealer. (XREP to XREQ) | |
* Java version, based on the C version from | |
* http://zguide.zeromq.org/chapter:all#toc45 | |
*/ | |
import org.zeromq.ZMQ; | |
import java.util.Arrays; | |
import java.util.Random; | |
/** | |
* Router-to-dealer custom routing demo. | |
* | |
* The router, in this case the main function, uses XREP. The | |
* dealers, in this case the two worker threads, use XREQ. | |
*/ | |
public class rtdealer { | |
static final String URI = "ipc://routing.ipc"; | |
static final int NOFLAGS = 0; | |
/** | |
* Worker runnable consumes messages until it receives an END | |
* message. | |
*/ | |
public static class Worker implements Runnable { | |
public final String name; | |
private final byte[] END = "END".getBytes(); | |
Worker(String name) { this.name = name; } | |
public void run() { | |
ZMQ.Context context = ZMQ.context(1); | |
ZMQ.Socket socket = context.socket(ZMQ.XREQ); | |
socket.setIdentity(name.getBytes()); | |
socket.connect(URI); | |
int total = 0; | |
boolean finished = false; | |
while (!finished) { | |
byte[] data = socket.recv(NOFLAGS); | |
if (Arrays.equals(data, END)) { | |
finished = true; | |
System.out.println( | |
String.format( | |
"Worker %s received %d messages.", name, total | |
) | |
); | |
} | |
total += 1; | |
} | |
socket.close(); | |
context.term(); | |
} | |
} | |
/* Random number generator to determine message distribution. */ | |
private static Random rand = new Random(); | |
public static void main(String[] args) | |
throws InterruptedException { | |
ZMQ.Context context = ZMQ.context(1); | |
ZMQ.Socket socket = context.socket(ZMQ.XREP); | |
socket.bind(URI); | |
Thread workerA = new Thread(new Worker("A")); | |
Thread workerB = new Thread(new Worker("B")); | |
workerA.start(); | |
workerB.start(); | |
// Wait a second for the workers to connect their sockets. | |
System.out.println("Workers started, sleeping 1 second for warmup."); | |
Thread.sleep(1000); | |
// Send 10 tasks, scattered to A twice as often as B. | |
for (int i = 0; i < 10; i += 1) { | |
byte[] address; | |
if (rand.nextInt() % 3 == 0) { // 1/3 to B. | |
address = "B".getBytes(); | |
} else { // 2/3 to A. | |
address = "A".getBytes(); | |
} | |
socket.send(address, ZMQ.SNDMORE); | |
socket.send("This is the workload.".getBytes(), NOFLAGS); | |
} | |
socket.send("A".getBytes(), ZMQ.SNDMORE); | |
socket.send("END".getBytes(), NOFLAGS); | |
socket.send("B".getBytes(), ZMQ.SNDMORE); | |
socket.send("END".getBytes(), NOFLAGS); | |
socket.close(); | |
context.term(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment