Last active
August 29, 2015 14:11
-
-
Save radium226/8f847a985d2a3ba4bd9a to your computer and use it in GitHub Desktop.
Map a heavy function over a List using a pool of threads with the help of Rx.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.List; | |
| import java.util.Arrays; | |
| import java.util.Random; | |
| import java.util.concurrent.ExecutorService; | |
| import java.util.concurrent.Executors; | |
| import java.util.concurrent.TimeUnit; | |
| import java.util.logging.Level; | |
| import java.util.logging.Logger; | |
| import rx.Observable; | |
| import rx.schedulers.Schedulers; | |
| public class MapListUsingRx { | |
| final private static Logger LOGGER = Logger.getGlobal(); | |
| // Used by the randomInt function | |
| final public static Random RANDOM = new Random(); | |
| // It will be used by Rx to apply the heavy function to some integers | |
| final public static int THREAD_POOL_SIZE = 3; | |
| final public static ExecutorService THREAD_POOL = Executors.newFixedThreadPool(THREAD_POOL_SIZE); | |
| public static void main(String[] arguments) throws Throwable { | |
| // We create the Observable object using the list below. | |
| final List<Integer> inputs = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); | |
| Iterable<Integer> outputs = Observable.<Integer>from(inputs) | |
| // We use the flatMap method to convert the Integers into Observable<Integer> usable in the current Subscription | |
| .flatMap((Integer input) -> deferHeavyWeightStuff(input).subscribeOn(Schedulers.from(THREAD_POOL))) | |
| // In order to obtain an Iterable | |
| .toBlocking() | |
| .toIterable(); | |
| // And here we go! | |
| for (Integer output : outputs) { | |
| System.out.println(output); | |
| } | |
| THREAD_POOL.shutdown(); | |
| } | |
| // Here is heavy function which lasts long | |
| public static int doHeavyWeightStuff(int input) { | |
| int duration = randomInt(1, 5); | |
| long threadID = Thread.currentThread().getId(); | |
| String threadName = Thread.currentThread().getName(); | |
| LOGGER.log(Level.INFO, "Doing heavy weight stuff on {4} in {0} ({1}) for {2} second{3}", new Object[] {threadName, threadID, duration, duration > 1 ? "s" : "", input}); | |
| sleepQuietly(duration, TimeUnit.SECONDS); | |
| int output = (int) Math.pow(input, 2); | |
| return output; | |
| } | |
| // We wrap the heavy function execution into a defered Observable | |
| public static Observable<Integer> deferHeavyWeightStuff(final int input) { | |
| return Observable.defer(() -> Observable.just(doHeavyWeightStuff(input))); | |
| } | |
| // Choose a random integer in a given range | |
| public static int randomInt(int min, int max) { | |
| return RANDOM.nextInt((max - min) + 1) + min; | |
| } | |
| // Sleep some time without throwing an InterruptedException | |
| public static void sleepQuietly(int duration, TimeUnit unit) { | |
| try { | |
| Thread.sleep(unit.toMillis(duration)); | |
| } catch (InterruptedException e) { | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment