Skip to content

Instantly share code, notes, and snippets.

@nithril
Last active August 29, 2015 14:06
Show Gist options
  • Save nithril/444d8373ce67f0a8b853 to your computer and use it in GitHub Desktop.
Save nithril/444d8373ce67f0a8b853 to your computer and use it in GitHub Desktop.
import org.junit.Assert;
import org.testng.annotations.Test;
import reactor.core.Environment;
import reactor.rx.Stream;
import reactor.rx.spec.Streams;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Created by nithril on 06/04/14.
*/
public class Reactor2Test {
@Test
public void testRandomlyFailed() throws InterruptedException {
Environment env = new Environment();
List<Integer> tasks = IntStream.range(0, 1500).boxed().collect(Collectors.toList());
System.out.println(tasks.size());
CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
Stream<Integer> worker = Streams.defer(env, env.getDefaultDispatcherFactory().get(), tasks);
worker.parallel(4).consume(s -> s.map(v -> {
System.out.println(Thread.currentThread() + " worker " + v);
return v;
}).consume(v -> {
countDownLatch.countDown();
System.out.println(Thread.currentThread() + " end " + v);
}));
countDownLatch.await(5, TimeUnit.SECONDS);
Assert.assertEquals(0, countDownLatch.getCount());
}
@Test
public void testOk() throws InterruptedException {
Environment env = new Environment();
List<Integer> tasks = IntStream.range(0, 1500).boxed().collect(Collectors.toList());
System.out.println(tasks.size());
CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
Stream<Integer> worker = Streams.defer(env, env.getDefaultDispatcherFactory().get());
worker.parallel(4).consume(s -> s.map(v -> {
System.out.println(Thread.currentThread() + " worker " + v);
return v;
}).consume(v -> {
countDownLatch.countDown();
System.out.println(Thread.currentThread() + " end " + v);
}));
tasks.forEach(worker::broadcastNext);
countDownLatch.await(5, TimeUnit.SECONDS);
Assert.assertEquals(0, countDownLatch.getCount());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment