Created
May 29, 2013 13:40
-
-
Save johndemic/5670325 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* $Id: UntilSuccessful.java 24377 2012-05-21 12:35:37Z pablo.kraan $ | |
* -------------------------------------------------------------------------------------- | |
* Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com | |
* | |
* The software in this package is published under the terms of the CPAL v1.0 | |
* license, a copy of which has been included with this distribution in the | |
* LICENSE.txt file. | |
*/ | |
package com.foo | |
import org.mule.DefaultMuleEvent; | |
import org.mule.DefaultMuleMessage; | |
import org.mule.VoidMuleEvent; | |
import org.mule.api.MessagingException; | |
import org.mule.api.MuleEvent; | |
import org.mule.api.MuleException; | |
import org.mule.api.MuleMessage; | |
import org.mule.api.MuleRuntimeException; | |
import org.mule.api.endpoint.EndpointBuilder; | |
import org.mule.api.endpoint.EndpointException; | |
import org.mule.api.lifecycle.InitialisationException; | |
import org.mule.api.processor.MessageProcessor; | |
import org.mule.api.retry.RetryCallback; | |
import org.mule.api.retry.RetryContext; | |
import org.mule.api.retry.RetryNotifier; | |
import org.mule.api.retry.RetryPolicyTemplate; | |
import org.mule.api.store.ListableObjectStore; | |
import org.mule.api.store.ObjectStoreException; | |
import org.mule.config.i18n.MessageFactory; | |
import org.mule.message.DefaultExceptionPayload; | |
import org.mule.retry.async.AsynchronousRetryTemplate; | |
import org.mule.retry.policies.SimpleRetryPolicyTemplate; | |
import org.mule.routing.filters.ExpressionFilter; | |
import org.mule.routing.outbound.AbstractOutboundRouter; | |
import org.mule.util.queue.QueueKey; | |
import org.mule.util.store.QueuePersistenceObjectStore; | |
import java.io.Serializable; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* UntilSuccessful attempts to route a message to the message processor it contains in an asynchronous manner. Routing | |
* is considered successful if no exception has been raised and, optionally, if the response matches an expression. | |
* UntilSuccessful can optionally be configured to synchronously return an acknowledgment message when it has scheduled | |
* the event for processing. UntilSuccessful is backed by a {@link ListableObjectStore} for storing the events that are | |
* pending (re)processing. | |
*/ | |
public class RootCausePreservingUntilSuccessful extends AbstractOutboundRouter | |
{ | |
public static final String PROCESS_ATTEMPT_COUNT_PROPERTY_NAME = "process.attempt.count"; | |
private static final int DEFAULT_PROCESS_ATTEMPT_COUNT_PROPERTY_VALUE = 1; | |
private ListableObjectStore<MuleEvent> objectStore; | |
private int maxRetries = 5; | |
private long secondsBetweenRetries = 60L; | |
private String failureExpression; | |
private String ackExpression; | |
private ExpressionFilter failureExpressionFilter; | |
private String eventKeyPrefix; | |
private Object deadLetterQueue; | |
private MessageProcessor dlqMP; | |
@Override | |
public void initialise() throws InitialisationException | |
{ | |
if (routes.isEmpty()) | |
{ | |
throw new InitialisationException( | |
MessageFactory.createStaticMessage("One message processor must be configured within UntilSuccessful."), | |
this); | |
} | |
if (routes.size() > 1) | |
{ | |
throw new InitialisationException( | |
MessageFactory.createStaticMessage("Only one message processor is allowed within UntilSuccessful." | |
+ " Use a Processor Chain to group several message processors into one."), | |
this); | |
} | |
if (objectStore == null) | |
{ | |
throw new InitialisationException( | |
MessageFactory.createStaticMessage("A ListableObjectStore must be configured on UntilSuccessful."), | |
this); | |
} | |
super.initialise(); | |
if (deadLetterQueue != null) | |
{ | |
if (deadLetterQueue instanceof EndpointBuilder) | |
{ | |
try | |
{ | |
dlqMP = ((EndpointBuilder) deadLetterQueue).buildOutboundEndpoint(); | |
} | |
catch (final EndpointException ee) | |
{ | |
throw new InitialisationException( | |
MessageFactory.createStaticMessage("deadLetterQueue-ref is not a valid endpoint builder: " | |
+ deadLetterQueue), ee, this); | |
} | |
} | |
else if (deadLetterQueue instanceof MessageProcessor) | |
{ | |
dlqMP = (MessageProcessor) deadLetterQueue; | |
} | |
else | |
{ | |
throw new InitialisationException( | |
MessageFactory.createStaticMessage("deadLetterQueue-ref is not a valid mesage processor: " | |
+ deadLetterQueue), null, this); | |
} | |
} | |
if (failureExpression != null) | |
{ | |
failureExpressionFilter = new ExpressionFilter(failureExpression); | |
} | |
else | |
{ | |
failureExpressionFilter = new ExpressionFilter("exception-type:"); | |
} | |
failureExpressionFilter.setMuleContext(muleContext); | |
if ((ackExpression != null) && (!muleContext.getExpressionManager().isExpression(ackExpression))) | |
{ | |
throw new InitialisationException(MessageFactory.createStaticMessage("Invalid ackExpression: " | |
+ ackExpression), this); | |
} | |
String flowName = flowConstruct.getName(); | |
String clusterId = muleContext.getClusterId(); | |
eventKeyPrefix = flowName + "-" + clusterId + "-"; | |
} | |
@Override | |
public void start() throws MuleException | |
{ | |
super.start(); | |
scheduleAllPendingEventsForProcessing(); | |
} | |
@Override | |
public boolean isMatch(final MuleMessage message) throws MuleException | |
{ | |
return true; | |
} | |
@Override | |
protected MuleEvent route(final MuleEvent event) throws MessagingException | |
{ | |
try | |
{ | |
ensurePayloadSerializable(event); | |
} | |
catch (final Exception e) | |
{ | |
throw new MessagingException( | |
MessageFactory.createStaticMessage("Failed to prepare message for processing"), event, e, this); | |
} | |
try | |
{ | |
final Serializable eventStoreKey = storeEvent(event); | |
scheduleForProcessing(eventStoreKey); | |
if (ackExpression == null) | |
{ | |
return VoidMuleEvent.getInstance(); | |
} | |
final Object ackResponsePayload = muleContext.getExpressionManager().evaluate(ackExpression, | |
event); | |
return new DefaultMuleEvent(new DefaultMuleMessage(ackResponsePayload, event.getMessage(), | |
muleContext), event); | |
} | |
catch (final Exception e) | |
{ | |
throw new MessagingException( | |
MessageFactory.createStaticMessage("Failed to schedule the event for processing"), event, e, this); | |
} | |
} | |
private void scheduleAllPendingEventsForProcessing() throws ObjectStoreException | |
{ | |
for (final Serializable eventStoreKey : objectStore.allKeys()) | |
{ | |
try | |
{ | |
scheduleForProcessing(eventStoreKey); | |
} | |
catch (final Exception e) | |
{ | |
logger.error( | |
MessageFactory.createStaticMessage("Failed to schedule for processing event stored with key: " | |
+ eventStoreKey), e); | |
} | |
} | |
} | |
private void scheduleForProcessing(final Serializable eventStoreKey) throws Exception | |
{ | |
final RetryCallback callback = new RetryCallback() | |
{ | |
@Override | |
public String getWorkDescription() | |
{ | |
return "Until successful processing of event stored under key: " + eventStoreKey; | |
} | |
@Override | |
public void doWork(final RetryContext context) throws Exception | |
{ | |
retrieveAndProcessEvent(eventStoreKey); | |
} | |
}; | |
final SimpleRetryPolicyTemplate simpleRetryPolicyTemplate = new SimpleRetryPolicyTemplate( | |
TimeUnit.SECONDS.toMillis(secondsBetweenRetries), maxRetries); | |
final RetryPolicyTemplate retryPolicyTemplate = new AsynchronousRetryTemplate( | |
simpleRetryPolicyTemplate); | |
retryPolicyTemplate.setNotifier(new RetryNotifier() | |
{ | |
@Override | |
public void onSuccess(final RetryContext context) | |
{ | |
removeFromStore(eventStoreKey); | |
} | |
@Override | |
public void onFailure(final RetryContext context, final Throwable e) | |
{ | |
incrementProcessAttemptCountOrRemoveFromStore(eventStoreKey, e); | |
} | |
}); | |
retryPolicyTemplate.execute(callback, muleContext.getWorkManager()); | |
} | |
private Serializable storeEvent(final MuleEvent event) throws ObjectStoreException | |
{ | |
final MuleMessage message = event.getMessage(); | |
final Integer deliveryAttemptCount = message.getInvocationProperty( | |
PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, DEFAULT_PROCESS_ATTEMPT_COUNT_PROPERTY_VALUE); | |
return storeEvent(event, deliveryAttemptCount); | |
} | |
private Serializable storeEvent(final MuleEvent event, final int deliveryAttemptCount) | |
throws ObjectStoreException | |
{ | |
final MuleMessage message = event.getMessage(); | |
message.setInvocationProperty(PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, deliveryAttemptCount); | |
final Serializable eventStoreKey = buildQueueKey(event); | |
objectStore.store(eventStoreKey, event); | |
return eventStoreKey; | |
} | |
public static Serializable buildQueueKey(final MuleEvent muleEvent) | |
{ | |
// the key is built in way to prevent UntilSuccessful workers across a cluster to compete for the same | |
// events over a shared object store | |
String key = muleEvent.getFlowConstruct() + "-" + muleEvent.getMuleContext().getClusterId() + "-" + muleEvent.getId(); | |
return new QueueKey(QueuePersistenceObjectStore.DEFAULT_QUEUE_STORE, key); | |
} | |
private void incrementProcessAttemptCountOrRemoveFromStore(final Serializable eventStoreKey, Throwable rootCause) | |
{ | |
try | |
{ | |
final MuleEvent event = objectStore.remove(eventStoreKey); | |
final MuleEvent mutableEvent = threadSafeCopy(event); | |
final MuleMessage message = mutableEvent.getMessage(); | |
final Integer deliveryAttemptCount = message.getInvocationProperty( | |
PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, DEFAULT_PROCESS_ATTEMPT_COUNT_PROPERTY_VALUE); | |
if (deliveryAttemptCount <= getMaxRetries()) | |
{ | |
// we store the incremented version unless the max attempt count has been reached | |
message.setInvocationProperty(PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, deliveryAttemptCount + 1); | |
objectStore.store(eventStoreKey, mutableEvent); | |
} | |
else | |
{ | |
mutableEvent.getMessage().setExceptionPayload(new DefaultExceptionPayload(rootCause)); | |
abandonRetries(event, mutableEvent); | |
} | |
} | |
catch (final ObjectStoreException ose) | |
{ | |
logger.error("Failed to increment failure count for event stored with key: " + eventStoreKey); | |
} | |
} | |
private void abandonRetries(final MuleEvent event, final MuleEvent mutableEvent) | |
{ | |
if (dlqMP == null) | |
{ | |
logger.info("Retry attempts exhausted and no DLQ defined, dropping message: " + event); | |
return; | |
} | |
try | |
{ | |
logger.info("Retry attempts exhausted, routing message to DLQ: " + dlqMP); | |
dlqMP.process(mutableEvent); | |
} | |
catch (final MuleException me) | |
{ | |
logger.error("Failed to route message to DLQ: " + dlqMP + ", dropping message: " + event, me); | |
} | |
} | |
private void removeFromStore(final Serializable eventStoreKey) | |
{ | |
try | |
{ | |
objectStore.remove(eventStoreKey); | |
} | |
catch (final ObjectStoreException ose) | |
{ | |
logger.warn("Failed to remove following event from store with key: " + eventStoreKey); | |
} | |
} | |
private void retrieveAndProcessEvent(final Serializable eventStoreKey) throws ObjectStoreException | |
{ | |
final MuleEvent persistedEvent = objectStore.retrieve(eventStoreKey); | |
final MuleEvent mutableEvent = threadSafeCopy(persistedEvent); | |
processEvent(mutableEvent); | |
} | |
private void processEvent(final MuleEvent event) | |
{ | |
if (routes.isEmpty()) | |
{ | |
return; | |
} | |
MuleEvent returnEvent; | |
try | |
{ | |
returnEvent = routes.get(0).process(event); | |
} | |
catch (final MuleException me) | |
{ | |
throw new MuleRuntimeException(me); | |
} | |
if (returnEvent == null || VoidMuleEvent.getInstance().equals(returnEvent)) | |
{ | |
return; | |
} | |
final MuleMessage msg = returnEvent.getMessage(); | |
if (msg == null) | |
{ | |
throw new MuleRuntimeException( | |
MessageFactory.createStaticMessage("No message found in response to processing, which is therefore considered failed for event: " | |
+ event)); | |
} | |
final boolean errorDetected = failureExpressionFilter.accept(msg); | |
if (errorDetected) | |
{ | |
throw new MuleRuntimeException( | |
MessageFactory.createStaticMessage("Failure expression positive when processing event: " | |
+ event)); | |
} | |
} | |
private DefaultMuleEvent threadSafeCopy(final MuleEvent event) | |
{ | |
final DefaultMuleMessage message = new DefaultMuleMessage(event.getMessage().getPayload(), | |
event.getMessage(), muleContext); | |
return new DefaultMuleEvent(message, event); | |
} | |
private void ensurePayloadSerializable(final MuleEvent event) throws Exception | |
{ | |
final MuleMessage message = event.getMessage(); | |
if (message instanceof DefaultMuleMessage) | |
{ | |
if (((DefaultMuleMessage) message).isConsumable()) | |
{ | |
message.getPayloadAsBytes(); | |
} | |
} | |
else | |
{ | |
message.getPayloadAsBytes(); | |
} | |
} | |
public ListableObjectStore<MuleEvent> getObjectStore() | |
{ | |
return objectStore; | |
} | |
public void setObjectStore(final ListableObjectStore<MuleEvent> objectStore) | |
{ | |
this.objectStore = objectStore; | |
} | |
public int getMaxRetries() | |
{ | |
return maxRetries; | |
} | |
public void setMaxRetries(final int maxRetries) | |
{ | |
this.maxRetries = maxRetries; | |
} | |
public long getSecondsBetweenRetries() | |
{ | |
return secondsBetweenRetries; | |
} | |
public void setSecondsBetweenRetries(final long secondsBetweenRetries) | |
{ | |
this.secondsBetweenRetries = secondsBetweenRetries; | |
} | |
public String getFailureExpression() | |
{ | |
return failureExpression; | |
} | |
public void setFailureExpression(final String failureExpression) | |
{ | |
this.failureExpression = failureExpression; | |
} | |
public String getAckExpression() | |
{ | |
return ackExpression; | |
} | |
public void setAckExpression(final String ackExpression) | |
{ | |
this.ackExpression = ackExpression; | |
} | |
public void setDeadLetterQueue(final Object deadLetterQueue) | |
{ | |
this.deadLetterQueue = deadLetterQueue; | |
} | |
public Object getDeadLetterQueue() | |
{ | |
return deadLetterQueue; | |
} | |
public String getEventKeyPrefix() | |
{ | |
return eventKeyPrefix; | |
} | |
public ExpressionFilter getFailureExpressionFilter() | |
{ | |
return failureExpressionFilter; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<flow name="untilSuccessful"> | |
<vm:inbound-endpoint path="in"/> | |
<custom-router class="com.pennmutual.springimporttest.RootCausePreservingUntilSuccessful"> | |
<script:component> | |
<script:script engine="groovy"> | |
throw new RuntimeException("foo") | |
</script:script> | |
</script:component> | |
<spring:property name="objectStore" ref="objectStore"/> | |
<spring:property name="maxRetries" value="2"/> | |
<spring:property name="secondsBetweenRetries" value="1"/> | |
<spring:property name="deadLetterQueue" ref="DLQ"/> | |
</custom-router> | |
</flow> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment