Skip to content

Instantly share code, notes, and snippets.

@guidomedina
Last active November 5, 2018 19:27
Show Gist options
  • Save guidomedina/ddd729492c8fb8148032 to your computer and use it in GitHub Desktop.
Save guidomedina/ddd729492c8fb8148032 to your computer and use it in GitHub Desktop.
MpscBoundedMailbox
import akka.actor.ActorRef;
import akka.dispatch.*;
import org.jctools.queues.MpscArrayQueue;
/**
* Non-blocking, multiple producer, single consumer high performance bounded message queue,
* this implementation is similar but simpler than LMAX disruptor.
*/
public final class MpscBoundedMailbox implements MessageQueue {
private final MpscArrayQueue<Envelope> queue;
public MpscBoundedMailbox(int capacity) {
queue = new MpscArrayQueue<>(capacity);
}
@Override
public void enqueue(ActorRef receiver, Envelope handle) {
queue.offer(handle);
}
@Override
public Envelope dequeue() {
return queue.poll();
}
@Override
public int numberOfMessages() {
return queue.size();
}
@Override
public boolean hasMessages() {
return !queue.isEmpty();
}
@Override
public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
if (hasMessages()) {
Envelope envelope;
while ((envelope = queue.poll()) != null) {
deadLetters.enqueue(owner, envelope);
}
}
}
}
import akka.actor.*;
import akka.dispatch.*;
import com.typesafe.config.Config;
import scala.Option;
/**
* Non-blocking, multiple producer, single consumer high performance bounded mailbox,
* this implementation is similar but simpler than LMAX disruptor.
*/
public final class MpscBoundedMailboxType implements MailboxType, ProducesMessageQueue<MpscBoundedMailbox> {
/**
* Mailbox capacity, a power of two greater or equal will be used.
*/
private final int capacity;
@SuppressWarnings("UnusedParameters")
public MpscBoundedMailboxType(ActorSystem.Settings settings, Config config) {
capacity = config.getInt("mailbox-capacity");
if (capacity < 1) {
throw new IllegalArgumentException("Mailbox capacity must not be less than 1");
}
}
@Override
public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
return new MpscBoundedMailbox(capacity);
}
}
bounded-mailbox-256 {
mailbox-type = "akka.dispatch.MpscBoundedMailbox"
mailbox-capacity = 256
}
bounded-mailbox-512 {
mailbox-type = "akka.dispatch.MpscBoundedMailbox"
mailbox-capacity = 512
}
bounded-mailbox-1024 {
mailbox-type = "akka.dispatch.MpscBoundedMailbox"
mailbox-capacity = 1024
}
bounded-mailbox-2048 {
mailbox-type = "akka.dispatch.MpscBoundedMailbox"
mailbox-capacity = 2048
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment