Skip to content

Instantly share code, notes, and snippets.

@afedotov
Created July 10, 2015 18:32
Show Gist options
  • Save afedotov/c9565ac5a53a45662d30 to your computer and use it in GitHub Desktop.
Save afedotov/c9565ac5a53a45662d30 to your computer and use it in GitHub Desktop.
Camel message processor that implements behavior of .pollEnrich() pattern with dynamic Endpoint URI
package org.apache.camel.processor;
import org.apache.camel.*;
import org.apache.camel.util.ExchangeHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
*
* This is message processor that implements behavior of .pollEnrich() pattern,
* but with dynamic Endpoint URI, based on any supported Camel Expression.
* http://camel.apache.org/expression.html
*
* There are many common patterns when you need to call .pollEnrich() on dynamic,
* not static URI, which based on current Exchange properties/headers/body/etc...
* Unfortunately this functionality is not available Out-Of-Box in the Camel 2.x.x
* and scheduled to the Camel 3.x.x.
* https://issues.apache.org/jira/browse/CAMEL-4596
*
* So this Processor is a simple solution that will help to implement such scenario.
*
* Upon enriching all Exchange Messages are aggregated into single body that contain List of Messages.
*
* Usage example:
*
* <code>
*
* String enrichURI = "file:path/to/basedir/${date:now:yyyyMMdd}?fileName=index.xml";
*
* from("direct:start")
* .process(DynamicPollEnricher.from(simple(enrichURI)))
* .to("bean:messageProcessor")
* .to("mock:result");
*
* </code>
*
*
*/
public class DynamicPollEnricher implements Processor {
private static final Logger log = LoggerFactory.getLogger(DynamicPollEnricher.class);
private Expression endpointExpression;
private long receiveTimeout;
public DynamicPollEnricher() {
}
public static DynamicPollEnricher from(Expression expression) {
return from(expression, -1);
}
public static DynamicPollEnricher from(Expression expression, long timeout) {
DynamicPollEnricher instance = new DynamicPollEnricher();
instance.endpointExpression = expression;
instance.receiveTimeout = timeout;
return instance;
}
@Override
public void process(Exchange exchange) throws Exception {
// Evaluate endpoint Expression in the Exchange context to populate dynamic values
String populatedURI = endpointExpression.evaluate(exchange, String.class);
log.info("Starting poll-enrich from the URI = {}, receiveTimeout = {}", populatedURI, receiveTimeout);
ConsumerTemplate consumerTemplate = exchange.getContext().createConsumerTemplate();
try {
// Receive Exchange from resource endpoint
Exchange resourceExchange = doReceive(consumerTemplate, populatedURI);
// If there are no message received from resource endpoint, we always assume it is erroneous behavior
if (resourceExchange == null)
throw new TimeoutException(MessageFormatter.format("There is no any message received from endpoint after {} seconds", receiveTimeout).getMessage());
// Aggregate original and resource Exchanges.
// The body of original message will contain aggregated MessageList.
ExchangeHelper.prepareAggregation(exchange, resourceExchange);
MessageList body = convertAndGetBodyAsMessageList(exchange.getIn());
body.add(resourceExchange.getIn());
// Resource Exchange synchronizations are handovered to original Exchange.
// So if original Exchange is rolled back, enriching will be rolled back too
resourceExchange.handoverCompletions(exchange);
} finally {
// If we manually create ConsumeTemplate via CamelContext.createConsumerTemplate(),
// then we should stop this Service in the end, to avoid threads/memory leakage.
consumerTemplate.stop();
}
}
private Exchange doReceive(ConsumerTemplate consumerTemplate, String uri) {
Exchange answer = null;
if (receiveTimeout < 0) {
log.debug("doReceive() starting blocking receive from endpoint URI = {}", uri);
answer = consumerTemplate.receive(uri);
} else if (receiveTimeout == 0) {
log.debug("doReceive() starting non-blocking receive from endpoint URI = {}", uri);
answer = consumerTemplate.receiveNoWait(uri);
} else {
log.debug("doReceive() starting timed blocking receive from endpoint URI = {}, receiveTimeout = {}", uri, receiveTimeout);
answer = consumerTemplate.receive(uri, receiveTimeout);
}
return answer;
}
private MessageList convertAndGetBodyAsMessageList(Message message) {
Object body = message.getBody();
if (body instanceof MessageList) {
return (MessageList) body;
} else {
MessageList messageList = new MessageList();
messageList.add(message.copy());
message.setBody(messageList);
return messageList;
}
}
private class MessageList extends LinkedList<Message> {
}
//
// Helper methods for easily accessing contents of aggregated message body
//
public static Message getMessage(Exchange exchange, int messageIndex) {
return exchange.getIn().getBody(MessageList.class).get(messageIndex);
}
public static Object getMessageBody(Exchange exchange, int messageIndex) {
return getMessage(exchange, messageIndex).getBody();
}
public static <T> T getMessageBody(Exchange exchange, int messageIndex, Class<T> type) {
return getMessage(exchange, messageIndex).getBody(type);
}
public static Map<String, Object> getMessageHeaders(Exchange exchange, int messageIndex) {
return getMessage(exchange, messageIndex).getHeaders();
}
public static Object getMessageHeader(Exchange exchange, int messageIndex, String headerName) {
return getMessage(exchange, messageIndex).getHeader(headerName);
}
public static <T> T getMessageHeader(Exchange exchange, int messageIndex, String headerName, Class<T> type) {
return getMessage(exchange, messageIndex).getHeader(headerName, type);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment