Created
October 12, 2017 15:00
-
-
Save Hoobie/2c6973e633e41997a9fddafb39387114 to your computer and use it in GitHub Desktop.
rxjava-paging-with-backpressure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.rx; | |
import rx.*; | |
import java.util.Collections; | |
import java.util.Deque; | |
import java.util.List; | |
import java.util.concurrent.ConcurrentLinkedDeque; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.stream.Collectors; | |
import java.util.stream.IntStream; | |
public class App { | |
private static final int MAX_INTEGER = 1000000; | |
public static void main(String[] args) { | |
Observable.<Integer>unsafeCreate(subscriber -> | |
subscriber.setProducer(new IntegerProducer((Subscriber<Integer>) subscriber)) | |
).subscribe(new Subscriber<Integer>() { | |
@Override | |
public void onStart() { | |
request(127); | |
} | |
@Override | |
public void onCompleted() { | |
System.out.println("End"); | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
throwable.printStackTrace(); | |
} | |
@Override | |
public void onNext(Integer integer) { | |
System.out.println(integer); | |
request(63); | |
} | |
}); | |
} | |
private static class IntegerProducer implements Producer { | |
private static final int PAGE_SIZE = 50; | |
private Deque<Integer> deque = new ConcurrentLinkedDeque<>(); | |
private AtomicInteger offset = new AtomicInteger(0); | |
private AtomicInteger limit = new AtomicInteger(PAGE_SIZE); | |
private AtomicLong requestedNo = new AtomicLong(0L); | |
private AtomicBoolean pagesFinished = new AtomicBoolean(); | |
private AtomicBoolean processing = new AtomicBoolean(); | |
private Subscriber<Integer> subscriber; | |
private SingleSubscriber<List<Integer>> pagingSubscriber = new SingleSubscriber<List<Integer>>() { | |
@Override | |
public void onError(Throwable throwable) { | |
subscriber.onError(throwable); | |
} | |
@Override | |
public void onSuccess(List<Integer> integersPage) { | |
if (integersPage.isEmpty()) { | |
pagesFinished.set(true); | |
return; | |
} | |
offset.addAndGet(PAGE_SIZE); | |
limit.addAndGet(PAGE_SIZE); | |
deque.addAll(integersPage); | |
process(); | |
} | |
}; | |
IntegerProducer(Subscriber<Integer> subscriber) { | |
this.subscriber = subscriber; | |
} | |
@Override | |
public void request(long l) { | |
requestedNo.addAndGet(l); | |
process(); | |
} | |
void process() { | |
if (deque.isEmpty()) { | |
getPage(offset, limit).subscribe(pagingSubscriber); | |
} | |
if (processing.get()) { | |
return; | |
} | |
if (pagesFinished.get() && deque.isEmpty()) | |
subscriber.onCompleted(); | |
processing.set(true); | |
while (requestedNo.get() > 0 && !deque.isEmpty()) { | |
requestedNo.decrementAndGet(); | |
subscriber.onNext(deque.poll()); | |
} | |
processing.set(false); | |
} | |
} | |
private static Single<List<Integer>> getPage(AtomicInteger offset, AtomicInteger limit) { | |
if (offset.get() >= MAX_INTEGER) { | |
return Single.just(Collections.emptyList()); | |
} | |
return Single.just(IntStream.range(offset.get(), limit.get()).boxed().collect(Collectors.toList())); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment