Created
January 18, 2011 13:15
-
-
Save arjones/784413 to your computer and use it in GitHub Desktop.
Kestrel Producer/Consumer example in Java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.arjones.sandbox; | |
import net.lag.configgy.Config; | |
import net.lag.kestrel.PersistentQueue; | |
public class KestrelTest { | |
public static void main(String[] args) throws InterruptedException { | |
Config config = new Config(); | |
// using memory only | |
config.setBool("journal", false); | |
PersistentQueue queue = new PersistentQueue("", "", config.copy()); | |
// publish 100 messages | |
for (int i = 0; i < 100; i++) { | |
String value = "Hello World from Java #" + i; | |
// boolean sucess = | |
queue.add(value.getBytes(), 0); | |
// System.out.println("Sucess: " + sucess); | |
} | |
int CONSUMERS = 4; | |
Thread[] threads = new Thread[CONSUMERS]; | |
// start all | |
for (int i = 0; i < CONSUMERS; i++) { | |
threads[i] = new Thread(new QueueConsumer(queue)); | |
threads[i].start(); | |
} | |
// join all | |
for (int i = 0; i < CONSUMERS; i++) { | |
threads[i].join(); | |
} | |
// Flushing the queue remove all messages | |
// queue.flush(); | |
System.out.println("Closing the queue"); | |
queue.close(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.arjones.sandbox; | |
import java.util.Date; | |
import scala.Option; | |
import net.lag.kestrel.PersistentQueue; | |
import net.lag.kestrel.QItem; | |
public class QueueConsumer implements Runnable { | |
private final PersistentQueue queue; | |
public QueueConsumer(PersistentQueue queue) { | |
this.queue = queue; | |
} | |
@Override | |
public void run() { | |
while (true) { | |
Option<QItem> qitem = queue.remove(); | |
// finish execution when there is no payload | |
if (null == qitem || qitem.isEmpty()) break; | |
// Consume message | |
String message = new String(qitem.get().data()); | |
System.out.println(String.format("\nThread: %s\nQueued at: %s\nMessage: %s\n", Thread.currentThread().getId(), new Date(qitem | |
.get().addTime()), message)); | |
try { | |
System.out.print("."); | |
Thread.sleep(300); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi arjones!
Unfortunately, this example doesn't compile with the latest Kestrel lib. (I just downloaded kestrel-2.1.4.jar). Could you help me with it? I would really appreciate it.
Maria