Skip to content

Instantly share code, notes, and snippets.

@cattaka
Created September 24, 2016 18:54
Show Gist options
  • Save cattaka/abe164e3c3e6cf42190ab513340d2e0f to your computer and use it in GitHub Desktop.
Save cattaka/abe164e3c3e6cf42190ab513340d2e0f to your computer and use it in GitHub Desktop.
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created by cattaka on 16/09/24.
*/
public class Main {
public static void main(String[] args) throws Exception {
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, queue);
PublishSubject<Integer> subject = PublishSubject.create();
PublishSubject<Integer> finalSubject = PublishSubject.create();
subject.observeOn(Schedulers.computation())
.subscribe(new Action1<Integer>() {
long lastAccept;
volatile long lastInterval;
@Override
public void call(Integer integer) {
long t = System.currentTimeMillis();
long x = lastInterval / executor.getMaximumPoolSize();
if (executor.getActiveCount() < executor.getMaximumPoolSize()
&& lastAccept + x <= t
) {
lastAccept = t;
executor.execute(new Runnable() {
@Override
public void run() {
sleep(1000);
lastInterval = Math.min(1000, System.currentTimeMillis() - t);
finalSubject.onNext(integer);
}
});
}
}
});
finalSubject.observeOn(Schedulers.computation())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("Value " + integer);
}
});
for (int i = 0; i < 10000; i++) {
subject.onNext(i);
sleep(100);
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);
}
private static void sleep(long l) {
try {
Thread.sleep(l);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment