If you want to just run the code, check out either of these two branches:
To make a request that will poke the code path I edited, do this:
curl -v 'http://localhost:8080/api/new-store/proxy-store-ccs' -H 'Origin: http://localhost:8080' -H 'Accept-Encoding: gzip, deflate' -H 'Accept-Language: en-US,en;q=0.8' -H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36' -H 'Content-Type: application/json;charset=UTF-8' -H 'Accept: application/json, text/plain, */*' -H 'Referer: http://localhost:8080/newStore.html' -H 'Cookie: optimizelyEndUserId=oeu1464711254770r0.09941448612711756; optimizelySegments=%7B%225935862247%22%3A%22direct%22%2C%225943910249%22%3A%22none%22%2C%225940031114%22%3A%22false%22%2C%225931541813%22%3A%22gc%22%7D; optimizelyBuckets=%7B%7D; _ga=GA1.1.795951785.1464711256; __utma=111872281.795951785.1464711256.1464711256.1464711256.1; __utmz=111872281.1464711256.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); Idea-61c85cf7=51b40123-3c76-42f9-be05-a801ab868f65' -H 'Connection: keep-alive' --data-binary '{"location-number":"551","merchandise-hierarchy-guids":["e3bf3690-122c-4ca0-8ab6-2c19eafd4d30","e2276dfb-e299-429e-a3c6-86c6488c93f1","d7208623-4301-4064-9d01-9bcdce9bbbd4","fe1952b9-912c-46bc-a902-ac5645c04579","a539755c-05c0-42f5-8469-a63e225d1276","c916cf02-cc86-41ba-b33b-3e64a281b939","ce085a1d-aad8-4f72-9229-ba41901f22d1","0e325ab7-7b49-43d6-9bb0-783c68502a53","b458966e-27a3-4931-9cc2-49eea238e909","d2118e82-ea56-4c98-b1c2-2bb23512372a","35ea78be-0cec-4de5-81d9-1cbafe7d61bc","99dafb1d-9125-462d-bb23-ce65fc26a492","30588474-9877-4b22-9fc4-f893cd2182b8","86e7b55b-fb98-4f6f-bb4c-bb7146dfdeb0","eaef00b6-b60a-49de-b87e-5d8944cc35ac","b9f4ab93-a7f0-420d-90e6-34992fc985d4","dd114b82-a652-4185-a4c4-640f7cb7629a","00d43c1a-2e02-40a3-936d-28a02816aad6","d52c3dd8-8943-48b5-b696-0f242d1983a0"],"brand":"4","market":"US","channel":"RTL"}' --compressed
When the AngularJS Allocation Web client makes a request to Allocation Web that requires data from more than one upstream service call, we would like to execute these requests concurrently. We would like to ensure that - in executing these requests - we do not severely impact the performance of the Spring application.
Some other things to have that would be nice:
- We would like some way to be notified when an exception is thrown during the execution of an HTTP request
- We might want some way to kill in-progress tasks if some other task failed
This approach relies upon:
RestTemplate
ForkJoinPool
parallelStream()
andflatMap
This approach:
- Allows us fine-grained control over the pool size (open question: any way to share a pool across inbound requests?)
- Catches the first
Exception
thrown in aForkJoinTask
, stops theForkJoinPool
, and rethrows - Does not kill the in-process HTTP requests
public List<CatalogCustomerChoice> ccsByMultipleMerchandiseHierarchyGuidsAndLocationIdForkJoinPool(
List<String> mhGuids,
String locationId
) throws Throwable {
ForkJoinPool forkJoinPool = new ForkJoinPool(5);
try {
return forkJoinPool.submit(() -> {
return mhGuids.parallelStream().flatMap(guid -> {
CatalogSearchParams clientSearch = CatalogSearchParams
.builder()
.mhGuid(guid)
.locationId(locationId)
.build();
return allocationClient.searchCatalogCustomerChoices(clientSearch).parallelStream();
}).collect(Collectors.toList());
}).get();
} catch (Exception e1) {
forkJoinPool.shutdownNow();
try {
throw e1;
} catch (ExecutionException e2) {
throw e2.getCause();
}
}
}
No
Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool().
https://blog.krecan.net/2014/03/18/how-to-specify-thread-pool-for-java-8-parallel-streams/
This approach relies upon:
AsyncRestTemplate
with a thread pool of size 5CompletableFuture.allOf
to wait until allCompletableFuture
s have finishedListenableFuture
mapped toCompletableFuture
(see Appendix A)parallelize
utility method (see Appendix B)
This approach:
- Immediately completes the outer
CompletableFuture
with any exception thrown during processing of innerCompletableFuture
s but does not complete the inner promises - Allows us to share an
AsyncRestTemplate
-level thread pool - Requires us to write a lot of utility code (or use someone's lib)
public List<CatalogCustomerChoice> ccsByMultipleMerchandiseHierarchyGuidsAndLocationIdFutures(
List<String> mhGuids,
String locationId
) throws Throwable {
List<CompletableFuture<List<CatalogCustomerChoice>>> futures = mhGuids
.stream()
.map(guid -> {
AllocationClient.CatalogSearchParams clientSearch = AllocationClient.CatalogSearchParams
.builder()
.mhGuid(guid)
.locationId(locationId)
.build();
return allocationClient.asyncSearchCatalogCustomerChoices(clientSearch);
}).collect(toList());
List<List<CatalogCustomerChoice>> resolved;
try {
resolved = parallelize(futures).get();
} catch (ExecutionException e) {
throw e.getCause();
}
return resolved
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
package com.gap.plan.allocation.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.Executors;
@Configuration
public class RestConfig {
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public AsyncRestTemplate asyncRestTemplate() {
return new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newFixedThreadPool(5)));
}
}
static <T> CompletableFuture<T> listenableToCompletableFuture(final ListenableFuture<T> listenableFuture) {
CompletableFuture<T> completable = new CompletableFuture<T>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = listenableFuture.cancel(mayInterruptIfRunning);
super.cancel(mayInterruptIfRunning);
return result;
}
};
listenableFuture.addCallback(new ListenableFutureCallback<T>() {
@Override
public void onSuccess(T result) {
completable.complete(result);
}
@Override
public void onFailure(Throwable t) {
completable.completeExceptionally(t);
}
});
return completable;
}
private <T> CompletableFuture<List<T>> parallel(List<CompletableFuture<T>> cfs) {
@SuppressWarnings({"unchecked", "rawtypes"}) // generic array creation
final CompletableFuture<T>[] arr = cfs.toArray(new CompletableFuture[cfs.size()]);
CompletableFuture<List<T>> cfl = CompletableFuture.allOf(arr)
.thenApply(v -> cfs.stream()
.map(CompletableFuture::join)
.collect(toList())
);
cfs.forEach(cf -> {
cf.whenComplete((t, ex) -> {
if (ex != null) {
cfl.completeExceptionally(ex);
}
});
});
return cfl;
}