Skip to content

Instantly share code, notes, and snippets.

@esfand
Last active April 27, 2018 11:09
Show Gist options
  • Save esfand/7261672 to your computer and use it in GitHub Desktop.
Save esfand/7261672 to your computer and use it in GitHub Desktop.
Thread Management in Java 8

Concurrent Package overview

The package is a ton of utilities for developing concurrent programs in Java: concurrent maps, synchronization strategies, blocking queues, thread management, and many more. The latter is the one we are concerned about: thread management. Thread management in this package all starts with the Executors helper class and the ExecutorService interface. The Executors helper class provides easy methods to create an ExecutorService as either a cached thread pool that grows on demand as new threads are needed or a fixed thread pool that ensures tasks are queued once the threads are exhausted. As a developer all you need to do is simply submit new tasks and the task will be executed in the background once a thread is available. The result is a Future . With the future in hand, you can poll for the completion of the event or simply wait for it. With only a few lines of code, we have created a re-usable and thread managed system for querying APIs.

import java.util.concurrent.*;
 
public class ApiService {
    private final int THREADS = Runtime.getRuntime().availableProcessors();
    private ExecutorService executor = Executors.newFixedThreadPool(THREADS);
 
    public void invokeApi(String path) {
        Future<Object> future = executor.submit(new Callable<Object>() {
            public Object call() {
                // query the API (via HttpURLConnection, HTTP Client, commons http, etc)
                // map the API to POJOs (jaxb, JSON mappers, etc)
                return result;
            }
        });
 
        Object result = future.get(5, TimeUnit.SECONDS)
        // do stuff with result
    }
}

With these few lines, we ensure that only THREADS count of tasks will run concurrently with very few lines of code. Before the addition of these packages and classes, this type of programming required several libraries and classes.

With Groovy, this becomes even simpler as you can directly use closures without the new Callable() { public void call() { } } boilerplate. JDK 7 and Lambda also will remove the boilerplate code.

Now that we have this code in place we can do a few things to improve response times and processing.

First, if you only need one API invocation per request, the only thing this snippet provides is thread queuing. If you have multiple invocations, then you can submit all of the independent ones and wait on them so they invoke concurrently rather than serially. Rather than 3x time, it is just 1x.

Second, if you are using the Servlet 3.0 API, you can pause a request while waiting on the API invocation to complete to allow the servlet container to use the request thread to process other incoming threads. This will provide greater request thread usage and higher throughput of the entire system. I would not recommend this if the expected API processing times and latency are below 250-500ms as the context switching between threads could become expensive if the servlet container is pausing, processing, unpausing, etc threads over and over.

Call us as we wont call you: the call back interface

Instead of having to ask whether a task has been done, we could prefer to have the workers inform us of the fact they have completed a job. That is an approach we can take with the asynchronous processing in Java too. We will not call future.get() or some other method to ask if hopefully our task has been completed. We instruct the aysynchronous ‘slave’ to come back to us to tell us when it is done. Well, to be more precise: we make it part of the job we submit to call us at the end of it. There is no special magic to it, no special infrastructure in the Java language for this call back structure. It is a simple Design Pattern that we apply.

First of all, the task itself is more formally specified, not using a Callable object that is created on the fly but using a formal Class definition:

package future;

import java.util.concurrent.Callable;

public class CallingBackWorker implements Callable {
    private CallbackInterface employer;

    public CallingBackWorker() {
    }

    public Object call() {
        new SlowWorker().doWork();
        employer.returnResult(&quot;Task Completed!&quot;);
        return null;
    }

    public void setEmployer(CallbackInterface employer) {
        this.employer = employer;
    }

    public CallbackInterface getEmployer() {
        return employer;
    }
}

You will notice that this class expects to have a CallBackInterface set, an employer it will call when the work is done. So in order to make use of this CallingBackWorker – that in turn invokes the SlowWorker again – we need to inject it with an implementation of the CallBackInterface.

This interface is as simple as you would expect:

package future;

public interface CallbackInterface {

    public void returnResult(Object result);
}

And one implementation of it is class CalledBack. This class submits five tasks and then sits and waits to be called by each asynchronous CallingBackWorker when the task is done.

package future;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CalledBack implements CallbackInterface{
    Object result;

    public CalledBack() {
    }

    public void returnResult(Object result) {
      System.out.println(&quot;Result Received &quot;+result);
      this.result = result;
    }

    public void andAction() {
        ExecutorService es = Executors.newFixedThreadPool(3);
        CallingBackWorker worker = new CallingBackWorker();
        worker.setEmployer(this);
        final Future future = es.submit( worker);
        System.out.println(&quot;... try to do something while the work is being done....&quot;);
        System.out.println(&quot;... and more ....&quot;);
        System.out.println(&quot;End work&quot; + new java.util.Date());
    }

    public static void main(String[] args) {
        new CalledBack().andAction();
    }

}

The output from this process is not spectacular:

... try to do something while the work is being done....
... and more ....
==== working, working, working ====== (Worker Id = 1)
End workWed Feb 18 11:05:11 CET 2009
==== ready! ======
Result Received Task Completed!

but we did not have to ask for the result ourselves, and that is good news!

CompletionService in Java Concurrency

Source

A Future interface provides methods to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result is retrieved using method get when the computation has completed, and it blocks until it is ready. If a task completes by throwing an exception, corresponding Future.get rethrows it wrapped in an ExecutionException; and if it was cancelled, get throws CancellationException. The submit method in the ExecutorService return a Future when you submit a Runnable or a Callable. With an ExecutorService, you have to keep track of all Futures returned by the submit method to call the get method on it.

A CompletionService decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. You can submit Callable tasks to the CompletionService for execution and use the queue like methods take and poll to retrieve completed results as Futures, whenever they become available. ExecutorCompletionService is an implementation of CompletionService that delegate the computation to an Executor. When a task is submitted to a ExecutorCompletionService, it is wrapped with a QueueingFuture, a subclass of FutureTask. QueueingFuture overrides the done method of FutureTask to place result on the BlockingQueue associated with the CompletionService.

Let us understand it better with an example. We will submit few Callable tasks to a CompletionService and then wait for each result (as Future) using completionService.take();. Whenever we get a Future, we will call Future.get and write the result to the console. You should be familiar with Executor, ExecutorService, Future and FutureTask before learning this example to understand it better.

Example – CompletionServiceDemo.java

import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CompletionServiceDemo {

  public void compServDemo(final Executor exec) {

    CompletionService<String> completionService = new ExecutorCompletionService<>(exec);

    for (int i = 0; i < 4; i++) {
      // Callable task that will be submitted to CompletionService.
      Callable<String> task = new Callable<String>() {

        @Override
        public String call() {
          try {
            System.out.println("Started " + Thread.currentThread() + " at " + new Date());
            Thread.sleep(5000);
            System.out.println("Exiting " + Thread.currentThread() + " at " + new Date());
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          return Thread.currentThread() + " success!!!";
        }
      };

      // Submitting the task to the CompletionService
      completionService.submit(task);
    }

    // Waiting for the results and printing them
    for (int i = 0; i < 4; i++) {
      Future<String> f;
      try {
        f = completionService.take();
        System.out.println("RESULT=" + f.get() + " at " + new Date());
      } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
      }
    }
  }

 

  public static void main(String[] args) {

    CompletionServiceDemo cSD = new CompletionServiceDemo();
    cSD.compServDemo(Executors.newFixedThreadPool(2));
  }
}

OUTPUT

Started Thread[pool-1-thread-2,5,main] at Wed Jul 10 10:57:16 IST 2013
Started Thread[pool-1-thread-1,5,main] at Wed Jul 10 10:57:16 IST 2013
Exiting Thread[pool-1-thread-1,5,main] at Wed Jul 10 10:57:20 IST 2013
Exiting Thread[pool-1-thread-2,5,main] at Wed Jul 10 10:57:20 IST 2013
Started Thread[pool-1-thread-2,5,main] at Wed Jul 10 10:57:20 IST 2013
Started Thread[pool-1-thread-1,5,main] at Wed Jul 10 10:57:20 IST 2013
RESULT=Thread[pool-1-thread-1,5,main]success!!! at Wed Jul 10 10:57:20 IST 2013
RESULT=Thread[pool-1-thread-2,5,main]success!!! at Wed Jul 10 10:57:20 IST 2013
Exiting Thread[pool-1-thread-1,5,main] at Wed Jul 10 10:57:23 IST 2013
Exiting Thread[pool-1-thread-2,5,main] at Wed Jul 10 10:57:23 IST 2013
RESULT=Thread[pool-1-thread-1,5,main] success!!! at Wed Jul 10 10:57:23 IST 2013
RESULT=Thread[pool-1-thread-2,5,main] success!!! at Wed Jul 10 10:57:23 IST 2013

Note

You can also use ExecutorService instead of Executor in the above example if you need access to lifecycle methods:

public void compServDemo(final ExecutorService exec) {

ExecutorCompletionService in practice

Source

Everyone is talking about the future of Java, we continue our journey explaining Future interface in Java. ExecutorCompletionService wrapper class tries to address one of the biggest deficiencies of Future type - no support for callbacks or any event-driven behaviour whatsoever. Let's go back for a moment to our sample asynchronous task downloading contents of a given URL:

final ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
        try (InputStream input = url.openStream()) {
            return IOUtils.toString(input, StandardCharsets.UTF_8);
        }
    }
});

Having such code we can easily write a simple web search engine/crawler, examining several URLs concurrently:

final List<String> topSites = Arrays.asList(
        "www.google.com", "www.youtube.com", "www.yahoo.com", "www.msn.com",
        "www.wikipedia.org", "www.baidu.com", "www.microsoft.com", "www.qq.com",
        "www.bing.com", "www.ask.com", "www.adobe.com", "www.taobao.com",
        "www.youku.com", "www.soso.com", "www.wordpress.com", "www.sohu.com",
        "www.windows.com", "www.163.com", "www.tudou.com", "www.amazon.com"
);
 
final ExecutorService pool = Executors.newFixedThreadPool(5);
List<Future<String>> contentsFutures = new ArrayList<>(topSites.size());
for (final String site : topSites) {
    final Future<String> contentFuture = pool.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return IOUtils.toString(new URL("http://" + site), StandardCharsets.UTF_8);
        }
    });
    contentsFutures.add(contentFuture);
}

As easy as that. We simply submit separate task for each web site to a pool and wait for results. To achieve that we collect all Future objects into a collection and iterate through them:

for (Future<String> contentFuture : contentsFutures) {
    final String content = contentFuture.get();
    //...process contents
}

Each call to contentFuture.get() waits until downloading given web site (remember that each Future represent one site) is finished. This works, but has a major bottleneck. Imagine you have as many threads in a pool as tasks (20 sites in that case). I think it's understandable that you want to start processing contents of web sites as soon as they arrive, no matter which one is first. Response times vary greatly so don't be surprised to find some web sites responding within a second while others need even 20 seconds. But here's the problem: after submitting all the tasks we block on an arbitrary Future. There is no guarantee that this Future will complete first. It is very likely that other Future objects already completed and are ready for processing but we keep hanging on that arbitrary, first Future. In worst case scenario, if the first submitted page is slower by an order of magnitude compared to all the others, all the results except the first one are ready for processing and idle, while we keep waiting for the first one.

The obvious solution would be to sort web sites from fastest to slowest and submit them in that order. Then we would be guaranteed that Futures complete in the order in which we submitted them. But this is impractical and almost impossible in real life due to dynamic nature of web.

This is where ExecutorCompletionService steps in. It is a thin wrapper around ExecutorService that "remembers" all submitted tasks and allows you to wait for the first completed, as opposed to first submitted task. In a way ExecutorCompletionService keeps a handle to all intermediate Future objects and once any of them finishes, it's returned. Crucial API method is CompletionService.take() that blocks and waits for any underlying Future to complete. Here is the submit step with ExecutorCompletionService:

final ExecutorService pool = Executors.newFixedThreadPool(5);
final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(pool);
for (final String site : topSites) {
    completionService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return IOUtils.toString(new URL("http://" + site), StandardCharsets.UTF_8);
        }
    });
}

Notice how we seamlessly switched to completionService. Now the retrieval step:

for(int i = 0; i < topSites.size(); ++i) {
    final Future<String> future = completionService.take();
    try {
        final String content = future.get();
        //...process contents
    } catch (ExecutionException e) {
        log.warn("Error while downloading", e.getCause());
    }
}

You might be wondering why we need an extra counter? Unfortunately ExecutorCompletionService doesn't tell you how many Future objects are still there waiting so you must remember how many times to call take().

This solution feels much more robust. We process responses immediately when they are ready. take() blocks until fastest task still running finishes. And if processing takes a little bit longer and multiple responses finished, subsequent call to take() will return immediately. It's fun to observe the program when number of pool threads is as big as the number of tasks so that we begin downloading each site at the same time. You can easily see which websites have shortest response time and which respond very slowly.

ExecutorCompletionService seems wonderful, but in fact it is quite limited. You cannot use it with arbitrary collection of Future objects which you happened to obtain somehow. It works only with Executor abstraction. Also there is no built in support for processing incoming results concurrently as well. If we want to parse results concurrently, we need to manually submit them to a second thread pool. In the next few articles I will show you more powerful constructs that mitigate these disadvantages.

Concurrency in Java 8

Source

Java 8 is expected to reach general availability status in March 2014. Although this release will likely be celebrated (and remembered) for introducing Lambda expressions to the Java language, it includes other new features that will help to improve developer productivity. Consider the following enhancements to the Java Concurrency Utilities:

  • Improved ConcurrentHashMap class: The java.util.concurrent.ConcurrentHashMap class has been improved to make it and classes that use it more useful as caches. New methods include sequential and parallel bulk operations (forEach, search, and reduce) methods.

  • Fence intrinsics: This update exposes memory fence controls into Java code so that the Java Concurrency Utilities APIs can more accurately and efficiently control memory ordering and bounding. This task is accomplished by adding "three memory-ordering intrinsics to the sun.misc.Unsafe class."

  • Changes to the Fork/Join framework: The ForkJoinPool and ForkJoinTask classes have been updated to improve performance and supply additional functionality. "New features include support for completion-based designs that are often most appropriate for IO-bound usages, among others." Additionally, a new CountedCompleter class that subclasses ForkJoinTask and provides a completion action that's "performed when triggered and there are no remaining pending actions" has been introduced.

  • New CompletableFuture class: The new java.util.concurrent.CompletableFuture class is a "Future that may be explicitly completed (setting its value and status), and may include dependent functions and actions that trigger upon its completion." This class is associated with the new java.util.concurrent.CompletableFuture.AsynchronousCompletionTask interface and the new java.util.concurrent.CompletionException exception. Check out Tomasz Nurkiewicz's Java 8: Definitive guide to CompletableFuture blog post for an extensive tutorial on how to use CompletableFuture.

  • New StampedLock class: The new java.util.concurrent.locks.StampedLock class is "a capability-based lock with three modes for controlling read/write access." Check out Dr. Heinz Kabutz's Phaser and StampedLock Concurrency Synchronizers [video presentation[(http://parleys.com/play/5148922b0364bc17fc56ca4f/chapter8/about) to learn about StampedLock. A [PDF file}(http://www.jfokus.se/jfokus13/preso/jf13_PhaserAndStampedLock.pdf) of this presentation is also available.

  • Parallel array sorting: The java.util.Arrays class has been augmented with several parallel-prefixed class methods (such as void parallelSort(int[] a)) that leverage the Fork/Join framework to sort arrays in parallel.

  • Scalable updatable variables: The java.util.concurrent.atomic package includes new DoubleAccumulator, DoubleAdder, LongAccumulator, and LongAdder classes that address a scalability problem in the context of maintaining a single count, sum, or some other value with the possibility of updates from many threads. These new classes "internally employ contention-reduction techniques that provide huge throughput improvements as compared to atomic variables. This is made possible by relaxing atomicity guarantees in a way that is acceptable in most applications."

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment