Last active
November 5, 2018 19:27
-
-
Save guidomedina/ddd729492c8fb8148032 to your computer and use it in GitHub Desktop.
MpscBoundedMailbox
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorRef; | |
import akka.dispatch.*; | |
import org.jctools.queues.MpscArrayQueue; | |
/** | |
* Non-blocking, multiple producer, single consumer high performance bounded message queue, | |
* this implementation is similar but simpler than LMAX disruptor. | |
*/ | |
public final class MpscBoundedMailbox implements MessageQueue { | |
private final MpscArrayQueue<Envelope> queue; | |
public MpscBoundedMailbox(int capacity) { | |
queue = new MpscArrayQueue<>(capacity); | |
} | |
@Override | |
public void enqueue(ActorRef receiver, Envelope handle) { | |
queue.offer(handle); | |
} | |
@Override | |
public Envelope dequeue() { | |
return queue.poll(); | |
} | |
@Override | |
public int numberOfMessages() { | |
return queue.size(); | |
} | |
@Override | |
public boolean hasMessages() { | |
return !queue.isEmpty(); | |
} | |
@Override | |
public void cleanUp(ActorRef owner, MessageQueue deadLetters) { | |
if (hasMessages()) { | |
Envelope envelope; | |
while ((envelope = queue.poll()) != null) { | |
deadLetters.enqueue(owner, envelope); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.*; | |
import akka.dispatch.*; | |
import com.typesafe.config.Config; | |
import scala.Option; | |
/** | |
* Non-blocking, multiple producer, single consumer high performance bounded mailbox, | |
* this implementation is similar but simpler than LMAX disruptor. | |
*/ | |
public final class MpscBoundedMailboxType implements MailboxType, ProducesMessageQueue<MpscBoundedMailbox> { | |
/** | |
* Mailbox capacity, a power of two greater or equal will be used. | |
*/ | |
private final int capacity; | |
@SuppressWarnings("UnusedParameters") | |
public MpscBoundedMailboxType(ActorSystem.Settings settings, Config config) { | |
capacity = config.getInt("mailbox-capacity"); | |
if (capacity < 1) { | |
throw new IllegalArgumentException("Mailbox capacity must not be less than 1"); | |
} | |
} | |
@Override | |
public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) { | |
return new MpscBoundedMailbox(capacity); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bounded-mailbox-256 { | |
mailbox-type = "akka.dispatch.MpscBoundedMailbox" | |
mailbox-capacity = 256 | |
} | |
bounded-mailbox-512 { | |
mailbox-type = "akka.dispatch.MpscBoundedMailbox" | |
mailbox-capacity = 512 | |
} | |
bounded-mailbox-1024 { | |
mailbox-type = "akka.dispatch.MpscBoundedMailbox" | |
mailbox-capacity = 1024 | |
} | |
bounded-mailbox-2048 { | |
mailbox-type = "akka.dispatch.MpscBoundedMailbox" | |
mailbox-capacity = 2048 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment