Skip to content

Instantly share code, notes, and snippets.

@johndemic
Created February 15, 2012 19:19
Show Gist options
  • Save johndemic/1838281 to your computer and use it in GitHub Desktop.
Save johndemic/1838281 to your computer and use it in GitHub Desktop.
public class DelayingOutboundRouter extends AbstractOutboundRouter {
static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
@Override
protected MuleEvent route(MuleEvent event) throws MessagingException {
Future<MuleEvent> scheduled = executorService.schedule(
new MessageProcessorRunner(event.getMessage(), routes.get(0), event), 5, TimeUnit.SECONDS);
MuleEvent result;
try {
result = scheduled.get(60, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RoutingException(event,routes.get(0),e);
}
return result;
}
@Override
public boolean isMatch(MuleMessage message) throws MuleException {
return true;
}
class MessageProcessorRunner implements Callable<MuleEvent> {
MuleMessage message;
MessageProcessor route;
MuleEvent event;
MessageProcessorRunner(MuleMessage message, MessageProcessor route, MuleEvent event) {
this.message = message;
this.route = route;
this.event = event;
}
@Override
public MuleEvent call() throws Exception {
OutboundEndpoint endpoint = route instanceof OutboundEndpoint ? (OutboundEndpoint) route : null;
if (endpoint == null || endpoint.getFilter() == null || (endpoint.getFilter() != null && endpoint.getFilter().accept(message))) {
if (((DefaultMuleMessage) message).isConsumable()) {
throw new MessagingException(
CoreMessages.cannotCopyStreamPayload(message.getPayload().getClass().getName()),
event);
}
MuleMessage clonedMessage = cloneMessage(message);
clonedMessage.setOutboundProperty(MuleProperties.MULE_EVENT_TIMEOUT_PROPERTY, 2000);
return sendRequest(event, clonedMessage, route, true);
}
return null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment