Skip to content

Instantly share code, notes, and snippets.

@Hoobie
Created October 12, 2017 15:00
Show Gist options
  • Save Hoobie/2c6973e633e41997a9fddafb39387114 to your computer and use it in GitHub Desktop.
Save Hoobie/2c6973e633e41997a9fddafb39387114 to your computer and use it in GitHub Desktop.
rxjava-paging-with-backpressure
package com.example.rx;
import rx.*;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class App {
private static final int MAX_INTEGER = 1000000;
public static void main(String[] args) {
Observable.<Integer>unsafeCreate(subscriber ->
subscriber.setProducer(new IntegerProducer((Subscriber<Integer>) subscriber))
).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(127);
}
@Override
public void onCompleted() {
System.out.println("End");
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
request(63);
}
});
}
private static class IntegerProducer implements Producer {
private static final int PAGE_SIZE = 50;
private Deque<Integer> deque = new ConcurrentLinkedDeque<>();
private AtomicInteger offset = new AtomicInteger(0);
private AtomicInteger limit = new AtomicInteger(PAGE_SIZE);
private AtomicLong requestedNo = new AtomicLong(0L);
private AtomicBoolean pagesFinished = new AtomicBoolean();
private AtomicBoolean processing = new AtomicBoolean();
private Subscriber<Integer> subscriber;
private SingleSubscriber<List<Integer>> pagingSubscriber = new SingleSubscriber<List<Integer>>() {
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onSuccess(List<Integer> integersPage) {
if (integersPage.isEmpty()) {
pagesFinished.set(true);
return;
}
offset.addAndGet(PAGE_SIZE);
limit.addAndGet(PAGE_SIZE);
deque.addAll(integersPage);
process();
}
};
IntegerProducer(Subscriber<Integer> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long l) {
requestedNo.addAndGet(l);
process();
}
void process() {
if (deque.isEmpty()) {
getPage(offset, limit).subscribe(pagingSubscriber);
}
if (processing.get()) {
return;
}
if (pagesFinished.get() && deque.isEmpty())
subscriber.onCompleted();
processing.set(true);
while (requestedNo.get() > 0 && !deque.isEmpty()) {
requestedNo.decrementAndGet();
subscriber.onNext(deque.poll());
}
processing.set(false);
}
}
private static Single<List<Integer>> getPage(AtomicInteger offset, AtomicInteger limit) {
if (offset.get() >= MAX_INTEGER) {
return Single.just(Collections.emptyList());
}
return Single.just(IntStream.range(offset.get(), limit.get()).boxed().collect(Collectors.toList()));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment