Skip to content

Instantly share code, notes, and snippets.

@pedrotoliveira
Last active August 29, 2015 14:04
Show Gist options
  • Save pedrotoliveira/9eb2b1932efe9274ba97 to your computer and use it in GitHub Desktop.
Save pedrotoliveira/9eb2b1932efe9274ba97 to your computer and use it in GitHub Desktop.
onMessage method
@Override
public final void onMessage(final Message message, final Channel channel) throws IOException {
try {
logger.all().logInfo(String.format("Received Message: %s",message));
Assert.notNull(message, "Unknow message: " + message);
if (consumerEnabled()) {
T object = getObject(message);
this.beforeOnMessage(object);
R response = this.onMessage(object);
this.afterOnMessage(object);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
sendReplyMessage(message, channel, response);
}
} catch (ClassCastException | MessageConversionException | IllegalArgumentException | NullPointerException ex) {
logger.file().logError(String.format("Unknow message: message=%s error=%s", message, ex.getMessage()), ex);
logger.splunk().logError(String.format("Unknow message: message=%s error=%s", message, ex.getMessage()));
rejectMessage(message, channel, false);
} catch (RuntimeException ex) {
logger.file().logError(String.format("Unexpected Error: message=%s error=%s", message, ex.getMessage()), ex);
logger.splunk().logError(String.format("Unexpected Error: message=%s error=%s", message, ex.getMessage()));
cleanReplyToAddress(message);
redeliveryMessage(message, channel);
}
}
/**
* Reject message.
* <p/>
* @param message the message
* @param channel the channel
* @param requeue the requeue
* <p/>
* @throws IOException Signals that an I/O exception has occurred.
*/
private void rejectMessage(final Message message, final Channel channel, boolean requeue) throws IOException {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), requeue);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment