Created
July 10, 2015 18:32
-
-
Save afedotov/c9565ac5a53a45662d30 to your computer and use it in GitHub Desktop.
Camel message processor that implements behavior of .pollEnrich() pattern with dynamic Endpoint URI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.camel.processor; | |
import org.apache.camel.*; | |
import org.apache.camel.util.ExchangeHelper; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.slf4j.helpers.MessageFormatter; | |
import java.util.LinkedList; | |
import java.util.Map; | |
import java.util.concurrent.TimeoutException; | |
/** | |
* | |
* This is message processor that implements behavior of .pollEnrich() pattern, | |
* but with dynamic Endpoint URI, based on any supported Camel Expression. | |
* http://camel.apache.org/expression.html | |
* | |
* There are many common patterns when you need to call .pollEnrich() on dynamic, | |
* not static URI, which based on current Exchange properties/headers/body/etc... | |
* Unfortunately this functionality is not available Out-Of-Box in the Camel 2.x.x | |
* and scheduled to the Camel 3.x.x. | |
* https://issues.apache.org/jira/browse/CAMEL-4596 | |
* | |
* So this Processor is a simple solution that will help to implement such scenario. | |
* | |
* Upon enriching all Exchange Messages are aggregated into single body that contain List of Messages. | |
* | |
* Usage example: | |
* | |
* <code> | |
* | |
* String enrichURI = "file:path/to/basedir/${date:now:yyyyMMdd}?fileName=index.xml"; | |
* | |
* from("direct:start") | |
* .process(DynamicPollEnricher.from(simple(enrichURI))) | |
* .to("bean:messageProcessor") | |
* .to("mock:result"); | |
* | |
* </code> | |
* | |
* | |
*/ | |
public class DynamicPollEnricher implements Processor { | |
private static final Logger log = LoggerFactory.getLogger(DynamicPollEnricher.class); | |
private Expression endpointExpression; | |
private long receiveTimeout; | |
public DynamicPollEnricher() { | |
} | |
public static DynamicPollEnricher from(Expression expression) { | |
return from(expression, -1); | |
} | |
public static DynamicPollEnricher from(Expression expression, long timeout) { | |
DynamicPollEnricher instance = new DynamicPollEnricher(); | |
instance.endpointExpression = expression; | |
instance.receiveTimeout = timeout; | |
return instance; | |
} | |
@Override | |
public void process(Exchange exchange) throws Exception { | |
// Evaluate endpoint Expression in the Exchange context to populate dynamic values | |
String populatedURI = endpointExpression.evaluate(exchange, String.class); | |
log.info("Starting poll-enrich from the URI = {}, receiveTimeout = {}", populatedURI, receiveTimeout); | |
ConsumerTemplate consumerTemplate = exchange.getContext().createConsumerTemplate(); | |
try { | |
// Receive Exchange from resource endpoint | |
Exchange resourceExchange = doReceive(consumerTemplate, populatedURI); | |
// If there are no message received from resource endpoint, we always assume it is erroneous behavior | |
if (resourceExchange == null) | |
throw new TimeoutException(MessageFormatter.format("There is no any message received from endpoint after {} seconds", receiveTimeout).getMessage()); | |
// Aggregate original and resource Exchanges. | |
// The body of original message will contain aggregated MessageList. | |
ExchangeHelper.prepareAggregation(exchange, resourceExchange); | |
MessageList body = convertAndGetBodyAsMessageList(exchange.getIn()); | |
body.add(resourceExchange.getIn()); | |
// Resource Exchange synchronizations are handovered to original Exchange. | |
// So if original Exchange is rolled back, enriching will be rolled back too | |
resourceExchange.handoverCompletions(exchange); | |
} finally { | |
// If we manually create ConsumeTemplate via CamelContext.createConsumerTemplate(), | |
// then we should stop this Service in the end, to avoid threads/memory leakage. | |
consumerTemplate.stop(); | |
} | |
} | |
private Exchange doReceive(ConsumerTemplate consumerTemplate, String uri) { | |
Exchange answer = null; | |
if (receiveTimeout < 0) { | |
log.debug("doReceive() starting blocking receive from endpoint URI = {}", uri); | |
answer = consumerTemplate.receive(uri); | |
} else if (receiveTimeout == 0) { | |
log.debug("doReceive() starting non-blocking receive from endpoint URI = {}", uri); | |
answer = consumerTemplate.receiveNoWait(uri); | |
} else { | |
log.debug("doReceive() starting timed blocking receive from endpoint URI = {}, receiveTimeout = {}", uri, receiveTimeout); | |
answer = consumerTemplate.receive(uri, receiveTimeout); | |
} | |
return answer; | |
} | |
private MessageList convertAndGetBodyAsMessageList(Message message) { | |
Object body = message.getBody(); | |
if (body instanceof MessageList) { | |
return (MessageList) body; | |
} else { | |
MessageList messageList = new MessageList(); | |
messageList.add(message.copy()); | |
message.setBody(messageList); | |
return messageList; | |
} | |
} | |
private class MessageList extends LinkedList<Message> { | |
} | |
// | |
// Helper methods for easily accessing contents of aggregated message body | |
// | |
public static Message getMessage(Exchange exchange, int messageIndex) { | |
return exchange.getIn().getBody(MessageList.class).get(messageIndex); | |
} | |
public static Object getMessageBody(Exchange exchange, int messageIndex) { | |
return getMessage(exchange, messageIndex).getBody(); | |
} | |
public static <T> T getMessageBody(Exchange exchange, int messageIndex, Class<T> type) { | |
return getMessage(exchange, messageIndex).getBody(type); | |
} | |
public static Map<String, Object> getMessageHeaders(Exchange exchange, int messageIndex) { | |
return getMessage(exchange, messageIndex).getHeaders(); | |
} | |
public static Object getMessageHeader(Exchange exchange, int messageIndex, String headerName) { | |
return getMessage(exchange, messageIndex).getHeader(headerName); | |
} | |
public static <T> T getMessageHeader(Exchange exchange, int messageIndex, String headerName, Class<T> type) { | |
return getMessage(exchange, messageIndex).getHeader(headerName, type); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment