Last active
May 11, 2019 16:25
-
-
Save bedlaj/a2a56aa9291bced8c0a8edebacaf22b0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package eu.janbednar.stackoverflow; | |
import org.apache.camel.Exchange; | |
import org.apache.camel.Expression; | |
import org.apache.camel.builder.RouteBuilder; | |
import org.apache.camel.component.mock.MockEndpoint; | |
import org.apache.camel.test.junit4.CamelTestSupport; | |
import org.apache.camel.util.toolbox.AggregationStrategies; | |
import org.junit.Assert; | |
import org.junit.Test; | |
import java.io.File; | |
import java.util.List; | |
public class AggregatorCorrelationExpressionTest extends CamelTestSupport { | |
private final Expression CORRELATION_EXPRESSION = new Expression() { | |
@Override | |
public <T> T evaluate(Exchange exchange, Class<T> type) { | |
final String fileName = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); | |
final String correlationExpression = fileName.substring(0, fileName.indexOf('_')); | |
return exchange.getContext().getTypeConverter().convertTo( | |
type, | |
correlationExpression | |
); | |
} | |
}; | |
@Override | |
protected RouteBuilder createRouteBuilder() throws Exception { | |
return new RouteBuilder() { | |
@Override | |
public void configure() throws Exception { | |
from("file:inputDirectory") | |
.aggregate(CORRELATION_EXPRESSION, AggregationStrategies.groupedExchange()) | |
.completionSize(2)//Wait for two files | |
.completionTimeout(60000)//Or process single file, if completionSize was not fullified within one minute | |
.to("log:do_something") | |
.to("mock:result")//Here you can do anything. You have here List<Exchange> in message body | |
; | |
} | |
}; | |
} | |
@Test | |
public void testCorrelationExpression() throws Exception{ | |
MockEndpoint result = getMockEndpoint("mock:result"); | |
result.expectedMessageCount(2); | |
result.setResultWaitTime(20000); | |
Assert.assertTrue(new File("inputDirectory", "group1_1.txt").createNewFile()); | |
Assert.assertTrue(new File("inputDirectory", "group2_1.txt").createNewFile()); | |
Assert.assertTrue(new File("inputDirectory", "group1_2.txt").createNewFile()); | |
Assert.assertTrue(new File("inputDirectory", "group2_2.txt").createNewFile()); | |
result.assertIsSatisfied(); | |
Exchange group1 = result.getReceivedExchanges().get(0); | |
Exchange group2 = result.getReceivedExchanges().get(1); | |
Assert.assertEquals(2, group1.getIn().getBody(List.class).size()); | |
Assert.assertEquals(2, group2.getIn().getBody(List.class).size()); | |
if (((List<Exchange>)group1.getIn().getBody(List.class)) | |
.get(0).getIn().getHeader(Exchange.FILE_NAME, String.class) | |
.startsWith("group1")) { | |
Assert.assertTrue( | |
((List<Exchange>)group1.getIn().getBody(List.class)).stream().allMatch( | |
exchange -> exchange.getIn().getHeader(Exchange.FILE_NAME, String.class).startsWith("group1") | |
) | |
); | |
} | |
if (((List<Exchange>)group1.getIn().getBody(List.class)) | |
.get(0).getIn().getHeader(Exchange.FILE_NAME, String.class) | |
.startsWith("group2")) { | |
Assert.assertTrue( | |
((List<Exchange>)group1.getIn().getBody(List.class)).stream().allMatch( | |
exchange -> exchange.getIn().getHeader(Exchange.FILE_NAME, String.class).startsWith("group2") | |
) | |
); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment