-
-
Save mttkay/24881a0ce986f6ec4b4d to your computer and use it in GitHub Desktop.
public class Pager<I, O> { | |
private static final Observable FINISH_SEQUENCE = Observable.never(); | |
private PublishSubject<Observable<I>> pages; | |
private Observable<I> nextPage = finish(); | |
private Subscription subscription = Subscriptions.empty(); | |
private final PagingFunction<I> pagingFunction; | |
private final Func1<I, O> pageTransformer; | |
public static <T> Pager<T, T> create(PagingFunction<T> pagingFunction) { | |
return new Pager<>(pagingFunction, UtilityFunctions.<T>identity()); | |
} | |
public static <I, O> Pager<I, O> create(PagingFunction<I> pagingFunction, Func1<I, O> pageTransformer) { | |
return new Pager<>(pagingFunction, pageTransformer); | |
} | |
Pager(PagingFunction<I> pagingFunction, Func1<I, O> pageTransformer) { | |
this.pagingFunction = pagingFunction; | |
this.pageTransformer = pageTransformer; | |
} | |
/** | |
* Used in the paging function to signal the caller that no more pages are available, i.e. | |
* to finish paging by completing the paged sequence. | |
* | |
* @return the finish token | |
*/ | |
@SuppressWarnings("unchecked") | |
public static <T> Observable<T> finish() { | |
return FINISH_SEQUENCE; | |
} | |
/** | |
* Transforms the given sequence to have its subsequent pages pushed into the observer subscribed | |
* to the new sequence returned by this method. You can advance to the next page by calling {@link #next()} | |
* | |
* @param source the source sequence, which would be the first page of the sequence to be paged | |
* @return a new sequence based on {@code source}, where subscribers keep receiving pages through subsequent calls | |
* to {@link #next()} | |
*/ | |
public Observable<O> page(final Observable<I> source) { | |
return Observable.create(new Observable.OnSubscribe<O>() { | |
@Override | |
public void call(final Subscriber<? super O> subscriber) { | |
pages = PublishSubject.create(); | |
subscription = Observable.switchOnNext(pages).subscribe(new PageSubscriber(subscriber)); | |
subscriber.add(subscription); | |
pages.onNext(source); | |
} | |
}); | |
} | |
/** | |
* Returns the last page received from the pager. You may use this to | |
* retry that observable in case it failed the first time around. | |
*/ | |
public Observable<O> currentPage() { | |
return page(nextPage); | |
} | |
/** | |
* @return true, if there are more pages to be emitted. | |
*/ | |
public boolean hasNext() { | |
return nextPage != FINISH_SEQUENCE; | |
} | |
/** | |
* Advances the pager by pushing the next page of items into the current observer, is there is one. If the pager | |
* has been unsubscribed from or there are no more pages, this method does nothing. | |
*/ | |
public void next() { | |
if (!subscription.isUnsubscribed() && hasNext()) { | |
pages.onNext(nextPage); | |
} | |
} | |
public interface PagingFunction<T> extends Func1<T, Observable<T>> { | |
} | |
private final class PageSubscriber extends Subscriber<I> { | |
private final Subscriber<? super O> inner; | |
public PageSubscriber(Subscriber<? super O> inner) { | |
this.inner = inner; | |
} | |
@Override | |
public void onCompleted() { | |
inner.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
inner.onError(e); | |
} | |
@Override | |
public void onNext(I result) { | |
nextPage = pagingFunction.call(result); | |
inner.onNext(pageTransformer.call(result)); | |
if (nextPage == FINISH_SEQUENCE) { | |
pages.onCompleted(); | |
} | |
} | |
} | |
} | |
@mttkay I did a Pager based on yours:
public class Pager<I, P> {
private PublishSubject<P> pages;
private P nextPageToken;
private final Func2<P, I, P> pagingFunction;
private final Func1<P, Observable<I>> obtainFunction;
public static <T, O> Pager<T, O> create(O firstPage, Func2<O, T, O> pagingFunction,
Func1<O, Observable<T>> obtainFunction) {
return new Pager<>(firstPage, pagingFunction, obtainFunction);
}
Pager(P firstPage, Func2<P, I, P> pagingFunction, Func1<P, Observable<I>> obtainFunction) {
this.nextPageToken = firstPage;
this.pagingFunction = pagingFunction;
this.obtainFunction = obtainFunction;
pages = PublishSubject.create();
pages.doOnNext(p -> {
if (p == null) {
pages.onCompleted();
}
});
}
public Observable<I> getObservable() {
return Observable.defer(() -> page(nextPageToken));
}
public boolean hasNext() {
return nextPageToken != null;
}
public void next() {
if (pages.hasObservers() && hasNext()) {
pages.onNext(nextPageToken);
}
}
private Observable<I> page(final P source) {
return pages.startWith(source)
.flatMap(obtainFunction)
.doOnNext(page -> nextPageToken = pagingFunction.call(nextPageToken, page));
}
}
The usage is:
Pager.create(initialPageToken, (oldPageToken, pageResult) -> pageResult.getNextPageToken(), datasource::getPage )
or if you want to use offset instead of token:
Pager.create(0, (offset, pageResult) -> offset + pageResult.size(), datasource::getPageWithOffset )
basically the first 2 parameter are like rxjava scan
I don't know if I may be introducing any problem, I just tried to simplify the code.
@mttkay Are you missing setting nextPage = source
inside of the public Observable<O> page(final Observable<I> source)
method? If the first request fails, then nextPage
Observable is never updated and is still equal to FINISH_SEQUENCE
. This means you can't retry the first request using the currentPage()
method as per your comment :(. Otherwise this solution is great, thank you!
@roman-mazur @mttkay the gist I provided on twitter does use backpressure as the paging control.