Skip to content

Instantly share code, notes, and snippets.

@radium226
Last active August 29, 2015 14:11
Show Gist options
  • Select an option

  • Save radium226/8f847a985d2a3ba4bd9a to your computer and use it in GitHub Desktop.

Select an option

Save radium226/8f847a985d2a3ba4bd9a to your computer and use it in GitHub Desktop.
Map a heavy function over a List using a pool of threads with the help of Rx.
import java.util.List;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import rx.Observable;
import rx.schedulers.Schedulers;
public class MapListUsingRx {
final private static Logger LOGGER = Logger.getGlobal();
// Used by the randomInt function
final public static Random RANDOM = new Random();
// It will be used by Rx to apply the heavy function to some integers
final public static int THREAD_POOL_SIZE = 3;
final public static ExecutorService THREAD_POOL = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
public static void main(String[] arguments) throws Throwable {
// We create the Observable object using the list below.
final List<Integer> inputs = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
Iterable<Integer> outputs = Observable.<Integer>from(inputs)
// We use the flatMap method to convert the Integers into Observable<Integer> usable in the current Subscription
.flatMap((Integer input) -> deferHeavyWeightStuff(input).subscribeOn(Schedulers.from(THREAD_POOL)))
// In order to obtain an Iterable
.toBlocking()
.toIterable();
// And here we go!
for (Integer output : outputs) {
System.out.println(output);
}
THREAD_POOL.shutdown();
}
// Here is heavy function which lasts long
public static int doHeavyWeightStuff(int input) {
int duration = randomInt(1, 5);
long threadID = Thread.currentThread().getId();
String threadName = Thread.currentThread().getName();
LOGGER.log(Level.INFO, "Doing heavy weight stuff on {4} in {0} ({1}) for {2} second{3}", new Object[] {threadName, threadID, duration, duration > 1 ? "s" : "", input});
sleepQuietly(duration, TimeUnit.SECONDS);
int output = (int) Math.pow(input, 2);
return output;
}
// We wrap the heavy function execution into a defered Observable
public static Observable<Integer> deferHeavyWeightStuff(final int input) {
return Observable.defer(() -> Observable.just(doHeavyWeightStuff(input)));
}
// Choose a random integer in a given range
public static int randomInt(int min, int max) {
return RANDOM.nextInt((max - min) + 1) + min;
}
// Sleep some time without throwing an InterruptedException
public static void sleepQuietly(int duration, TimeUnit unit) {
try {
Thread.sleep(unit.toMillis(duration));
} catch (InterruptedException e) {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment