Skip to content

Instantly share code, notes, and snippets.

@cjmamo
Last active January 2, 2016 10:19
Show Gist options
  • Save cjmamo/8289429 to your computer and use it in GitHub Desktop.
Save cjmamo/8289429 to your computer and use it in GitHub Desktop.
Scaling up Mule with Async Request Handling/Continuations
package org.ossandme;
...
// Naive implementation
public class AhcProcessor extends AbstractInterceptingMessageProcessor {
@Override
public MuleEvent process(final MuleEvent event) throws MuleException {
...
}
}
package org.ossandme;
...
// Naive implementation
public class AhcProcessor extends AbstractInterceptingMessageProcessor {
@Override
public MuleEvent process(final MuleEvent event) throws MuleException {
final CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
final HttpGet request = new HttpGet("http://www.google.com/");
httpclient.start();
httpclient.execute(request, new FutureCallback<HttpResponse>() {
public void completed(final HttpResponse response) {
try {
InputStream input = response.getEntity().getContent();
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
IOUtils.copy(input, byteOut);
event.setMessage(new DefaultMuleMessage(byteOut.toString(), event.getMessage().getMuleContext()));
final MuleEvent lastEvent = processNext(event);
lastEvent.getReplyToHandler().processReplyTo(lastEvent, lastEvent.getMessage(), event.getReplyToDestination());
} catch (Exception e) {
throw new RuntimeException();
}
}
public void failed(final Exception ex) {
System.out.println(request.getRequestLine() + "->" + ex);
}
public void cancelled() {
System.out.println(request.getRequestLine() + " cancelled");
}
});
...
}
}
package org.ossandme;
...
// Naive implementation
public class AhcProcessor extends AbstractInterceptingMessageProcessor {
@Override
public MuleEvent process(final MuleEvent event) throws MuleException {
final CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
final HttpGet request = new HttpGet("http://www.google.com/");
httpclient.start();
httpclient.execute(request, new FutureCallback<HttpResponse>() {
public void completed(final HttpResponse response) {
try {
InputStream input = response.getEntity().getContent();
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
IOUtils.copy(input, byteOut);
event.setMessage(new DefaultMuleMessage(byteOut.toString(), event.getMessage().getMuleContext()));
final MuleEvent lastEvent = processNext(event);
lastEvent.getReplyToHandler().processReplyTo(lastEvent, lastEvent.getMessage(), event.getReplyToDestination());
} catch (Exception e) {
throw new RuntimeException();
}
}
public void failed(final Exception ex) {
System.out.println(request.getRequestLine() + "->" + ex);
}
public void cancelled() {
System.out.println(request.getRequestLine() + " cancelled");
}
});
return null;
}
}
<mule ...>
<flow name="...">
<http:inbound-endpoint address="..." exchange-pattern="request-response"/>
<async>
<http:outbound-endpoint address="..." exchange-pattern="request-response"/>
</async>
...
</flow>
</mule>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
...
xmlns:jetty="http://www.mulesoft.org/schema/mule/jetty"
xsi:schemaLocation="
...
http://www.mulesoft.org/schema/mule/jetty http://www.mulesoft.org/schema/mule/jetty/current/mule-jetty.xsd">
<jetty:connector name="test" useContinuations="true"/>
<flow name="main">
<jetty:inbound-endpoint address="http://localhost:8081" exchange-pattern="request-response"/>
...
</flow>
</mule>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
...
xmlns:jetty="http://www.mulesoft.org/schema/mule/jetty"
xsi:schemaLocation="
...
http://www.mulesoft.org/schema/mule/jetty http://www.mulesoft.org/schema/mule/jetty/current/mule-jetty.xsd">
<jetty:connector name="test" useContinuations="true"/>
<flow name="main">
<jetty:inbound-endpoint address="http://localhost:8081" exchange-pattern="request-response"/>
<custom-processor class="org.ossandme.AhcProcessor"/>
<logger level="INFO" message="Reply from service received" />
...
</flow>
</mule>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment