Apache Camel is a tool designed to solve implementation of Enterprise Integration Patterns in a simple, consistent, and repeatable manner.
Camel is a framework written for the Java Virtual Machine and it provide many components which can be used to implement your business process. For example, you could translate messages encoded with XML into messages encoded with JSON.
In Camel, a Java DSL implementation of this might look like this:
package com.redhat.poc.camel;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
/**
* A Camel Java8 DSL Message Translator
*/
public class MyRouteBuilder extends RouteBuilder {
public void configure() {
from("jms:queue:XML.Topic")
.unmarshal().jacksonxml(HashMap.class)
.marshal().json(true)
.to("jms:queue:JSON.Topic");
}
}
The Camel team publishes updated Maven Archetypes for every release. This means that creating a new Camel project is quite simple.
$ mvn archetype:generate -DarchetypeGroupId=org.apache.camel.archetypes -DarchetypeArtifactId=camel-archetype-java -DarchetypeVersion=3.8.0
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.google.inject.internal.cglib.core.$ReflectUtils$1 (file:/usr/share/maven/lib/guice.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
WARNING: Please consider reporting this to the maintainers of com.google.inject.internal.cglib.core.$ReflectUtils$1
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:3.2.0:generate (default-cli) > generate-sources @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:3.2.0:generate (default-cli) < generate-sources @ standalone-pom <<<
[INFO]
[INFO]
[INFO] --- maven-archetype-plugin:3.2.0:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] Archetype repository not defined. Using the one from [org.apache.camel.archetypes:camel-archetype-java:3.8.0] found in catalog remote
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/camel/archetypes/camel-archetype-java/3.8.0/camel-archetype-java-3.8.0.pom
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/camel/archetypes/camel-archetype-java/3.8.0/camel-archetype-java-3.8.0.pom (2.4 kB at 21 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/camel/archetypes/camel-archetype-java/3.8.0/camel-archetype-java-3.8.0.jar
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/camel/archetypes/camel-archetype-java/3.8.0/camel-archetype-java-3.8.0.jar (9.9 kB at 90 kB/s)
Define value for property 'groupId': com.redhat.poc.camel
Define value for property 'artifactId': message-transformer
Define value for property 'version' 1.0-SNAPSHOT: : 1.0.0-SNAPSHOT
Define value for property 'package' com.redhat.poc.camel: :
[INFO] Using property: camel-version = 3.3.0
[INFO] Using property: exec-maven-plugin-version = 1.6.0
[INFO] Using property: maven-compiler-plugin-version = 3.8.1
[INFO] Using property: maven-resources-plugin-version = 3.1.0
Confirm properties configuration:
groupId: com.redhat.poc.camel
artifactId: message-transformer
version: 1.0.0-SNAPSHOT
package: com.redhat.poc.camel
camel-version: 3.8.0
exec-maven-plugin-version: 1.6.0
maven-compiler-plugin-version: 3.8.1
maven-resources-plugin-version: 3.1.0
Y: :
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: camel-archetype-java:3.8.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.redhat.poc.camel
[INFO] Parameter: artifactId, Value: message-transformer
[INFO] Parameter: version, Value: 1.0.0-SNAPSHOT
[INFO] Parameter: package, Value: com.redhat.poc.camel
[INFO] Parameter: packageInPathFormat, Value: com/redhat/poc/camel
[INFO] Parameter: maven-resources-plugin-version, Value: 3.1.0
[INFO] Parameter: package, Value: com.redhat.poc.camel
[INFO] Parameter: groupId, Value: com.redhat.poc.camel
[INFO] Parameter: maven-compiler-plugin-version, Value: 3.8.1
[INFO] Parameter: artifactId, Value: message-transformer
[INFO] Parameter: camel-version, Value: 3.3.0
[INFO] Parameter: version, Value: 1.0.0-SNAPSHOT
[INFO] Parameter: exec-maven-plugin-version, Value: 1.6.0
[INFO] Project created from Archetype in dir: /home/dphillips/message-transformer
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 30.327 s
[INFO] Finished at: 2021-01-28T13:06:18-05:00
[INFO] ------------------------------------------------------------------------
Once you have created the project, you will have a Maven project with 2 Java classes. One is the "Main" class and is the entrypoint for the application. The other is the "MyRouteBuilder" class and that is where we can define our "route".
Camel applications are made up of Routes. A Route is made of at least 2 Endpoints. There is a from endpoint where events flow into the route, and there is one or more to endpoints where events flow out of the route.
A Route runs in a CamelContextref, which is the Camel runtime system. You typically have one CamelContext object in an application. A typical application executes the following steps:
- Create a CamelContext object.
- Add endpoints – and possibly components, which are discussed in Section 4.5 ("Components") – to the CamelContext object.
- Add routes to the CamelContext object to connect the endpoints.
- Invoke the start() operation on the CamelContext object. This starts Camel-internal threads that are used to process the sending, receiving and processing of messages in the endpoints.
- Eventually invoke the stop() operation on the CamelContext object. Doing this gracefully stops all the endpoints and Camel-internal threads.
Between the from and to endpoints, routes are composed of various integration patterns like:
In Camel, each of these take parameters that tell them how to accomplish the goal you want to achieve. For example, a from endpoint which needs to read from a JMS message queue would have a JMS bean name and topic information like "jms:queue:Queue.Topic". Or if your were reading a stream of Tweets from Twitter, it might be something like from("twitter")
and the remaining parameters are part of your system properties or added as query parameters after "twitter".
For a filter, you might use a scripting language to filter events/messages. Something like:
from("timer:periodic?period=5s")
.filter(simple("${in.body} contains '5'"))
.to("log:com.redhat.poc.camel.MyRouteBuilder?level=INFO");
We could even create a "FizzBuzz" implementation using Camel pretty easily:
from("timer:simple?period=503") // Periodically create a new timer event every 503 milliseconds
.id("simple-route")
.transform()
.exchange(this::countIncrementer) // The the body of the event to be the next increment of a counter
.choice()
.when()
.body(Integer.class, b -> ((b % 3) == 0) && ((b % 5) == 0))
.setBody(constant("FizzBuzz"))
.to("stream:out")
.when()
.body(Integer.class, b -> ((b % 3) == 0))
.setBody(constant("Fizz"))
.to("stream:out")
.when()
.body(Integer.class, b -> ((b % 5) == 0))
.setBody(constant("Buzz"))
.to("stream:out")
.otherwise()
.to("stream:out")
.endChoice();
- SOAP - A simplified way of leveraging Apache CXF to read/write SOAP messages
String WS_URI = "cxf://http://myserver/customerservice?serviceClass=com.example.customerservice&dataFormat=RAW"; SoapJaxbDataFormat soapDF = new SoapJaxbDataFormat("com.example.customerservice", new ServiceInterfaceStrategy(CustomerService.class)); from("direct:customerServiceClient") .onException(Exception.class) .handled(true) .unmarshal(soapDF) .end() .marshal(soapDF) .to(WS_URI) .unmarshal(soapDF);
- JAXB - Marshal/Unmarshal XML data to/from POJOs/Beans
from("activemq:My.Queue"). unmarshal(jaxb). to("mqseries:Another.Queue");
- JDBC - Read from or write to relational databases using JDBC
from("direct:projects") .setHeader("lic", constant("ASF")) .setHeader("min", constant(123)) .setBody("select * from projects where license = :?lic and id > :?min order by id") .to("jdbc:myDataSource?useHeadersAsParameters=true")
- Kafka - Read/Write Kafka streams
from("kafka:test?brokers=localhost:9092") .log("Message received from Kafka : ${body}") .log(" on the topic ${headers[kafka.TOPIC]}") .log(" on the partition ${headers[kafka.PARTITION]}") .log(" with the offset ${headers[kafka.OFFSET]}") .log(" with the key ${headers[kafka.KEY]}") from("direct:start") .setBody(constant("Message from Camel")) // Message to send .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message .to("kafka:test?brokers=localhost:9092");
- Mock - Mock producers and endpoints
- We start with a Camel Route which has injected properties for the source/destination endpoint URIs:
@ApplicationScoped public class JMSRouteBuilder extends RouteBuilder { @ConfigProperty(name = "jms.example.queue.inbound", defaultValue = "jms:queue:XML.Topic") String jmsQueueInUri; @ConfigProperty(name = "jms.example.queue.outbound", defaultValue = "jms:queue:JSON.Topic") String jmsQueueOutUri; @Override public void configure() throws Exception { from(jmsQueueInUri) .unmarshal().jacksonxml(HashMap.class) .marshal().json(true) .log("Input Received") .to(jmsQueueOutUri); } }
- Then we have a Test class which instantiates our route/context with "mock" endpoints
public class JMSRouteBuilderTest extends CamelTestSupport { @Override public String isMockEndpoints() { return "*"; } @Override protected RouteBuilder createRouteBuilder() throws Exception { // Instantiate the route builder JMSRouteBuilder jmsRouteBuilder = new JMSRouteBuilder(); // Set the source/destination routes to be our "mock" endpoints jmsRouteBuilder.jmsQueueInUri = "direct:start"; jmsRouteBuilder.jmsQueueOutUri = "mock:dest"; // Return the RouterBuilder with our mocks injected manually return jmsRouteBuilder; } @Test public void testJMSRoute() throws InterruptedException { // Get a reference to the mock destination endpoint MockEndpoint jmsJsonOutput = getMockEndpoint("mock:dest"); // Set our assertion timeout jmsJsonOutput.setAssertPeriod(2000); // Set our asserted output message body jmsJsonOutput.expectedBodiesReceived("{\n \"head\" : {\n \"title\" : \"My Title\"\n },\n \"body\" : \"Value\"\n}"); // Set our asserted message count jmsJsonOutput.expectedMessageCount(1); // Insert our source body into the mock source endpoint template.sendBody("direct:start", "<html><head><title>My Title</title></head><body>Value</body></html>"); // Wait for the assertions to be satisfied or until we timeout assertMockEndpointsSatisfied(); } }
- We start with a Camel Route which has injected properties for the source/destination endpoint URIs: