Skip to content

Instantly share code, notes, and snippets.

@laser
Last active July 29, 2016 21:01
Show Gist options
  • Save laser/6ef8ce21fca6c386b6b1f53f2bd81c79 to your computer and use it in GitHub Desktop.
Save laser/6ef8ce21fca6c386b6b1f53f2bd81c79 to your computer and use it in GitHub Desktop.
Concurrent Stuff

Concurrency Stuff

If you want to just run the code, check out either of these two branches:

To make a request that will poke the code path I edited, do this:

curl -v 'http://localhost:8080/api/new-store/proxy-store-ccs' -H 'Origin: http://localhost:8080' -H 'Accept-Encoding: gzip, deflate' -H 'Accept-Language: en-US,en;q=0.8' -H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36' -H 'Content-Type: application/json;charset=UTF-8' -H 'Accept: application/json, text/plain, */*' -H 'Referer: http://localhost:8080/newStore.html' -H 'Cookie: optimizelyEndUserId=oeu1464711254770r0.09941448612711756; optimizelySegments=%7B%225935862247%22%3A%22direct%22%2C%225943910249%22%3A%22none%22%2C%225940031114%22%3A%22false%22%2C%225931541813%22%3A%22gc%22%7D; optimizelyBuckets=%7B%7D; _ga=GA1.1.795951785.1464711256; __utma=111872281.795951785.1464711256.1464711256.1464711256.1; __utmz=111872281.1464711256.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); Idea-61c85cf7=51b40123-3c76-42f9-be05-a801ab868f65' -H 'Connection: keep-alive' --data-binary '{"location-number":"551","merchandise-hierarchy-guids":["e3bf3690-122c-4ca0-8ab6-2c19eafd4d30","e2276dfb-e299-429e-a3c6-86c6488c93f1","d7208623-4301-4064-9d01-9bcdce9bbbd4","fe1952b9-912c-46bc-a902-ac5645c04579","a539755c-05c0-42f5-8469-a63e225d1276","c916cf02-cc86-41ba-b33b-3e64a281b939","ce085a1d-aad8-4f72-9229-ba41901f22d1","0e325ab7-7b49-43d6-9bb0-783c68502a53","b458966e-27a3-4931-9cc2-49eea238e909","d2118e82-ea56-4c98-b1c2-2bb23512372a","35ea78be-0cec-4de5-81d9-1cbafe7d61bc","99dafb1d-9125-462d-bb23-ce65fc26a492","30588474-9877-4b22-9fc4-f893cd2182b8","86e7b55b-fb98-4f6f-bb4c-bb7146dfdeb0","eaef00b6-b60a-49de-b87e-5d8944cc35ac","b9f4ab93-a7f0-420d-90e6-34992fc985d4","dd114b82-a652-4185-a4c4-640f7cb7629a","00d43c1a-2e02-40a3-936d-28a02816aad6","d52c3dd8-8943-48b5-b696-0f242d1983a0"],"brand":"4","market":"US","channel":"RTL"}' --compressed

Things We Want

When the AngularJS Allocation Web client makes a request to Allocation Web that requires data from more than one upstream service call, we would like to execute these requests concurrently. We would like to ensure that - in executing these requests - we do not severely impact the performance of the Spring application.

Some other things to have that would be nice:

  1. We would like some way to be notified when an exception is thrown during the execution of an HTTP request
  2. We might want some way to kill in-progress tasks if some other task failed

Examples

Parallel Stream + ForkJoinPool

This approach relies upon:

  • RestTemplate
  • ForkJoinPool
  • parallelStream() and flatMap

This approach:

  • Allows us fine-grained control over the pool size (open question: any way to share a pool across inbound requests?)
  • Catches the first Exception thrown in a ForkJoinTask, stops the ForkJoinPool, and rethrows
  • Does not kill the in-process HTTP requests
public List<CatalogCustomerChoice> ccsByMultipleMerchandiseHierarchyGuidsAndLocationIdForkJoinPool(
        List<String> mhGuids,
        String locationId
) throws Throwable {
    ForkJoinPool forkJoinPool = new ForkJoinPool(5);

    try {
        return forkJoinPool.submit(() -> {
            return mhGuids.parallelStream().flatMap(guid -> {
                CatalogSearchParams clientSearch = CatalogSearchParams
                        .builder()
                        .mhGuid(guid)
                        .locationId(locationId)
                        .build();
                return allocationClient.searchCatalogCustomerChoices(clientSearch).parallelStream();
            }).collect(Collectors.toList());
        }).get();
    } catch (Exception e1) {
        forkJoinPool.shutdownNow();
        try {
            throw e1;
        } catch (ExecutionException e2) {
            throw e2.getCause();
        }
    }
}
Doesn't parallelStream use the common ForkJoinPool?

No

Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool().

https://blog.krecan.net/2014/03/18/how-to-specify-thread-pool-for-java-8-parallel-streams/

CompletableFuture + AsyncRestTemplate

This approach relies upon:

  • AsyncRestTemplate with a thread pool of size 5
  • CompletableFuture.allOf to wait until all CompletableFutures have finished
  • ListenableFuture mapped to CompletableFuture (see Appendix A)
  • parallelize utility method (see Appendix B)

This approach:

  • Immediately completes the outer CompletableFuture with any exception thrown during processing of inner CompletableFutures but does not complete the inner promises
  • Allows us to share an AsyncRestTemplate-level thread pool
  • Requires us to write a lot of utility code (or use someone's lib)
public List<CatalogCustomerChoice> ccsByMultipleMerchandiseHierarchyGuidsAndLocationIdFutures(
        List<String> mhGuids,
        String locationId
) throws Throwable {
    List<CompletableFuture<List<CatalogCustomerChoice>>> futures = mhGuids
            .stream()
            .map(guid -> {
                AllocationClient.CatalogSearchParams clientSearch = AllocationClient.CatalogSearchParams
                        .builder()
                        .mhGuid(guid)
                        .locationId(locationId)
                        .build();
                return allocationClient.asyncSearchCatalogCustomerChoices(clientSearch);
            }).collect(toList());

    List<List<CatalogCustomerChoice>> resolved;
    try {
        resolved = parallelize(futures).get();
    } catch (ExecutionException e) {
        throw e.getCause();
    }

    return resolved
            .stream()
            .flatMap(Collection::stream)
            .collect(Collectors.toList());
}
package com.gap.plan.allocation.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.Executors;

@Configuration
public class RestConfig {

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    @Bean
    public AsyncRestTemplate asyncRestTemplate() {
        return new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newFixedThreadPool(5)));
    }
}

Appendix A

static <T> CompletableFuture<T> listenableToCompletableFuture(final ListenableFuture<T> listenableFuture) {
    CompletableFuture<T> completable = new CompletableFuture<T>() {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean result = listenableFuture.cancel(mayInterruptIfRunning);
            super.cancel(mayInterruptIfRunning);
            return result;
        }
    };

    listenableFuture.addCallback(new ListenableFutureCallback<T>() {
        @Override
        public void onSuccess(T result) {
            completable.complete(result);
        }

        @Override
        public void onFailure(Throwable t) {
            completable.completeExceptionally(t);
        }
    });

    return completable;
}

Appendix B

private <T> CompletableFuture<List<T>> parallel(List<CompletableFuture<T>> cfs) {
    @SuppressWarnings({"unchecked", "rawtypes"}) // generic array creation
    final CompletableFuture<T>[] arr = cfs.toArray(new CompletableFuture[cfs.size()]);

    CompletableFuture<List<T>> cfl = CompletableFuture.allOf(arr)
            .thenApply(v -> cfs.stream()
                    .map(CompletableFuture::join)
                    .collect(toList())
            );

    cfs.forEach(cf -> {
        cf.whenComplete((t, ex) -> {
            if (ex != null) {
                cfl.completeExceptionally(ex);
            }
        });
    });

    return cfl;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment