Created
May 11, 2012 10:00
-
-
Save msackman/2658712 to your computer and use it in GitHub Desktop.
QueueingConsumer extended with the CoDel algorithm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// The contents of this file are subject to the Mozilla Public License | |
// Version 1.1 (the "License"); you may not use this file except in | |
// compliance with the License. You may obtain a copy of the License | |
// at http://www.mozilla.org/MPL/ | |
// | |
// Software distributed under the License is distributed on an "AS IS" | |
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See | |
// the License for the specific language governing rights and | |
// limitations under the License. | |
// | |
// The Original Code is RabbitMQ. | |
// | |
// The Initial Developer of the Original Code is VMware, Inc. | |
// Copyright (c) 2012-2012 VMware, Inc. All rights reserved. | |
// | |
package com.rabbitmq.client; | |
import java.io.IOException; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import com.rabbitmq.client.AMQP.BasicProperties; | |
import com.rabbitmq.utility.Utility; | |
public class QueueingConsumerCoDel extends DefaultConsumer { | |
public static final long DEFAULT_TARGET_DELAY_MS = 200; | |
public static final long DEFAULT_INTERVAL_MS = 20; | |
public static final boolean DEFAULT_REQUEUE = true; | |
// Marker object used to signal the queue is in shutdown mode. | |
// It is only there to wake up consumers. The canonical representation | |
// of shutting down is the presence of _shutdown. | |
// Invariant: This is never on _queue unless _shutdown != null. | |
private static final Delivery POISON = new Delivery(null, null, null); | |
private static final Delivery NULL_DELIVERY = new Delivery(null, null, null); | |
private final BlockingQueue<Delivery> _queue; | |
// When this is non-null the queue is in shutdown mode and nextDelivery | |
// should | |
// throw a shutdown signal exception. | |
private volatile ShutdownSignalException _shutdown; | |
private volatile ConsumerCancelledException _cancelled; | |
private long _firstAboveTime = 0; | |
private long _dropNextAt = 0; | |
private long _dropCount = 0; | |
private boolean _dropping = false; | |
private final Channel _chan; | |
private final boolean _requeue; | |
private final long _targetDelay; | |
private final long _interval; | |
public QueueingConsumerCoDel(Channel ch) { | |
this(ch, new LinkedBlockingQueue<Delivery>(), DEFAULT_REQUEUE, | |
DEFAULT_TARGET_DELAY_MS, DEFAULT_INTERVAL_MS); | |
} | |
public QueueingConsumerCoDel(Channel ch, boolean requeue) { | |
this(ch, new LinkedBlockingQueue<Delivery>(), requeue, | |
DEFAULT_TARGET_DELAY_MS, DEFAULT_INTERVAL_MS); | |
} | |
public QueueingConsumerCoDel(Channel ch, boolean requeue, long targetDelay, | |
long interval) { | |
this(ch, new LinkedBlockingQueue<Delivery>(), requeue, targetDelay, | |
interval); | |
} | |
public QueueingConsumerCoDel(Channel ch, BlockingQueue<Delivery> q) { | |
this(ch, q, DEFAULT_REQUEUE, DEFAULT_TARGET_DELAY_MS, | |
DEFAULT_INTERVAL_MS); | |
} | |
public QueueingConsumerCoDel(Channel ch, BlockingQueue<Delivery> q, | |
boolean requeue, long targetDelay, long interval) { | |
super(ch); | |
_queue = q; | |
_chan = ch; | |
_requeue = requeue; | |
_targetDelay = targetDelay; | |
_interval = interval; | |
} | |
@Override | |
public void handleShutdownSignal(String consumerTag, | |
ShutdownSignalException sig) { | |
_shutdown = sig; | |
_queue.add(POISON); | |
} | |
@Override | |
public void handleCancel(String consumerTag) throws IOException { | |
_cancelled = new ConsumerCancelledException(); | |
_queue.add(POISON); | |
} | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, | |
AMQP.BasicProperties properties, byte[] body) throws IOException { | |
checkShutdown(); | |
this._queue.add(new Delivery(envelope, properties, body)); | |
} | |
/** | |
* Check if we are in shutdown mode and if so throw an exception. | |
*/ | |
private void checkShutdown() { | |
if (_shutdown != null) | |
throw Utility.fixStackTrace(_shutdown); | |
} | |
/** | |
* If delivery is not POISON nor null, return it. | |
* <p/> | |
* If delivery, _shutdown and _cancelled are all null, return null. | |
* <p/> | |
* If delivery is POISON re-insert POISON into the queue and throw an | |
* exception if POISONed for no reason. | |
* <p/> | |
* Otherwise, if we are in shutdown mode or cancelled, throw a corresponding | |
* exception. | |
*/ | |
private Delivery handle(Delivery delivery) { | |
if (delivery == POISON || delivery == null | |
&& (_shutdown != null || _cancelled != null)) { | |
if (delivery == POISON) { | |
_queue.add(POISON); | |
if (_shutdown == null && _cancelled == null) { | |
throw new IllegalStateException( | |
"POISON in queue, but null _shutdown and null _cancelled. " | |
+ "This should never happen, please report as a BUG"); | |
} | |
} | |
if (null != _shutdown) | |
throw Utility.fixStackTrace(_shutdown); | |
if (null != _cancelled) | |
throw Utility.fixStackTrace(_cancelled); | |
} | |
return delivery; | |
} | |
/** | |
* Main application-side API: wait for the next message delivery and return | |
* it. | |
* | |
* @return the next message | |
* @throws InterruptedException | |
* if an interrupt is received while waiting | |
* @throws ShutdownSignalException | |
* if the connection is shut down while waiting | |
* @throws IOException | |
* if an IOException gets thrown | |
*/ | |
public Delivery nextDelivery() throws InterruptedException, | |
ShutdownSignalException, ConsumerCancelledException, IOException { | |
Delivery d = dequeue(false); | |
while (true) { | |
if (d == NULL_DELIVERY) { | |
_dropping = false; | |
} | |
if (_dropping) { | |
if (!d.dropable) { | |
_dropping = false; | |
} else if (d.dequeuedAt >= _dropNextAt) { | |
while (d.dequeuedAt >= _dropNextAt && _dropping) { | |
_chan.basicNack(d.getEnvelope().getDeliveryTag(), | |
false, _requeue); | |
_dropCount++; | |
d = dequeue(false); | |
if (!d.dropable) { | |
_dropping = false; | |
} else { | |
_dropNextAt += controlLaw(); | |
} | |
} | |
} | |
} else if (d.dropable | |
&& ((d.dequeuedAt - _dropNextAt < _interval) || (d.dequeuedAt | |
- _firstAboveTime >= _interval))) { | |
_chan.basicNack(d.getEnvelope().getDeliveryTag(), false, | |
_requeue); | |
_dropping = true; | |
if (d.dequeuedAt - _dropNextAt < _interval) { | |
_dropCount = _dropCount > 2 ? _dropCount - 2 : 1; | |
} else { | |
_dropCount = 1; | |
} | |
_dropNextAt = d.dequeuedAt + controlLaw(); | |
d = NULL_DELIVERY; | |
} | |
if (d == NULL_DELIVERY) { | |
d = dequeue(true); | |
} else { | |
return d; | |
} | |
} | |
} | |
private long controlLaw() { | |
return (long) (_interval / Math.sqrt(_dropCount)); | |
} | |
private Delivery dequeue(boolean block) throws InterruptedException { | |
Delivery d; | |
if (block) { | |
d = _queue.take(); | |
} else { | |
d = _queue.poll(); | |
} | |
d = handle(d); | |
if (d == null) { | |
_firstAboveTime = 0; | |
d = NULL_DELIVERY; | |
} else { | |
d.dequeued(); | |
if (d.sojournTime < _targetDelay) { | |
_firstAboveTime = 0; | |
} else { | |
if (_firstAboveTime == 0) { | |
_firstAboveTime = d.dequeuedAt + _interval; | |
} else if (d.dequeuedAt >= _firstAboveTime) { | |
d.dropable = true; | |
} | |
} | |
} | |
return d; | |
} | |
public static class Delivery { | |
private final Envelope _envelope; | |
private final AMQP.BasicProperties _properties; | |
private final byte[] _body; | |
private long enqueuedAt; | |
private boolean dropable = false; | |
public long sojournTime = 0; | |
public long dequeuedAt = 0; | |
public Delivery(Envelope envelope, AMQP.BasicProperties properties, | |
byte[] body) { | |
_envelope = envelope; | |
_properties = properties; | |
_body = body; | |
enqueuedAt = System.currentTimeMillis(); | |
} | |
private void dequeued() { | |
dequeuedAt = System.currentTimeMillis(); | |
sojournTime = dequeuedAt - enqueuedAt; | |
} | |
/** | |
* Retrieve the message envelope. | |
* | |
* @return the message envelope | |
*/ | |
public Envelope getEnvelope() { | |
return _envelope; | |
} | |
/** | |
* Retrieve the message properties. | |
* | |
* @return the message properties | |
*/ | |
public BasicProperties getProperties() { | |
return _properties; | |
} | |
/** | |
* Retrieve the message body. | |
* | |
* @return the message body | |
*/ | |
public byte[] getBody() { | |
return _body; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment