Skip to content

Instantly share code, notes, and snippets.

@bholota
Created December 6, 2017 13:57
Show Gist options
  • Save bholota/235582bdf4d9445fec9e726ee2ae5707 to your computer and use it in GitHub Desktop.
Save bholota/235582bdf4d9445fec9e726ee2ae5707 to your computer and use it in GitHub Desktop.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;
public class Test {
@org.junit.Test
public void rxTest() throws InterruptedException {
int numberOfThreads = 10;
for (int i = 0; i < 100000; i++) {
final PublishSubject<Integer> sub = PublishSubject.create();
final AtomicInteger atomicInteger = new AtomicInteger();
sub.take(3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
atomicInteger.incrementAndGet();
}
});
final CountDownLatch latch = new CountDownLatch(numberOfThreads);
List<Thread> threads = new ArrayList<>();
for (int j = 0; j < numberOfThreads; j++) {
final int finalJ = j;
threads.add(new Thread(new Runnable() {
@Override
public void run() {
sub.onNext(finalJ);
latch.countDown();
}
}));
}
threads.forEach(new java.util.function.Consumer<Thread>() {
@Override
public void accept(Thread thread) {
thread.start();
}
});
latch.await();
assert(atomicInteger.get() == 3);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment