-
-
Save danielt1263/10bc5eb821c752ad45f281c6f4e3034b to your computer and use it in GitHub Desktop.
// | |
// PaginationNetworkLogic.swift | |
// | |
// Created by Daniel Tartaglia on 4/9/17. | |
// Copyright © 2019 Daniel Tartaglia. MIT License | |
// | |
import RxSwift | |
struct PaginationUISource { | |
/// reloads first page and dumps all other cached pages. | |
let refresh: Observable<Void> | |
/// loads next page | |
let loadNextPage: Observable<Void> | |
} | |
struct PaginationSink<T> { | |
/// true if network loading is in progress. | |
let isLoading: Observable<Bool> | |
/// elements from all loaded pages | |
let elements: Observable<[T]> | |
/// fires once for each error | |
let error: Observable<Error> | |
} | |
extension PaginationSink { | |
init(ui: PaginationUISource, loadData: @escaping (Int) -> Observable<[T]>) | |
{ | |
let loadResults = BehaviorSubject<[Int: [T]]>(value: [:]) | |
let maxPage = loadResults | |
.map { $0.keys } | |
.map { $0.max() ?? 1 } | |
let reload = ui.refresh | |
.map { -1 } | |
let loadNext = ui.loadNextPage | |
.withLatestFrom(maxPage) | |
.map { $0 + 1 } | |
let start = Observable.merge(reload, loadNext, Observable.just(1)) | |
let page = start | |
.flatMap { page in | |
Observable.combineLatest(Observable.just(page), loadData(page == -1 ? 1 : page)) { (pageNumber: $0, items: $1) } | |
.materialize() | |
.filter { $0.isCompleted == false } | |
} | |
.share() | |
_ = page | |
.compactMap { $0.element } | |
.withLatestFrom(loadResults) { (pages: $1, newPage: $0) } | |
.filter { $0.newPage.pageNumber == -1 || !$0.newPage.items.isEmpty } | |
.map { $0.newPage.pageNumber == -1 ? [1: $0.newPage.items] : $0.pages.merging([$0.newPage], uniquingKeysWith: { $1 }) } | |
.subscribe(loadResults) | |
let _isLoading = Observable.merge(start.map { _ in 1 }, page.map { _ in -1 }) | |
.scan(0, accumulator: +) | |
.map { $0 > 0 } | |
.distinctUntilChanged() | |
let _elements = loadResults | |
.map { $0.sorted(by: { $0.key < $1.key }).flatMap { $0.value } } | |
let _error = page | |
.map { $0.error } | |
.filter { $0 != nil } | |
.map { $0! } | |
isLoading = _isLoading | |
elements = _elements | |
error = _error | |
} | |
} |
class PaginationTests: XCTestCase { | |
var testScheduler: TestScheduler! | |
var bag: DisposeBag! | |
var isLoading: TestableObserver<Bool>! | |
var elements: TestableObserver<[Int]>! | |
var error: TestableObserver<Error>! | |
var dataLoader: DataLoader! | |
override func setUp() { | |
super.setUp() | |
testScheduler = TestScheduler(initialClock: 0) | |
bag = DisposeBag() | |
isLoading = testScheduler.createObserver(Bool.self) | |
elements = testScheduler.createObserver([Int].self) | |
error = testScheduler.createObserver(Error.self) | |
dataLoader = DataLoader(testScheduler: testScheduler) | |
} | |
func testDefault() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
let foundErrors = testScheduler.createObserver(Bool.self) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.map { _ in true }.subscribe(foundErrors) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .completed(30)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .completed(30)]) | |
XCTAssertEqual(foundErrors.events, [.completed(30)]) | |
XCTAssertEqual(dataLoader.pages, [1]) | |
} | |
func testRefresh() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 1]) | |
} | |
func testRefreshNewLoadEmpty() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadPageThenEmpty(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 1]) | |
} | |
func testNextPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testNextPageThenRefresh() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6]), .next(50, [1, 2, 3])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2, 1]) | |
} | |
func testNextPageBeforeInitialPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(5, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(15, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(15, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testNextPageComesBeforeInitialPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(3, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.reverseLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(8, [4, 5, 6]), .next(10, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testLoadError() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.errorLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]())]) | |
XCTAssertEqual(error.events.count, 1) | |
XCTAssertEqual(dataLoader.pages, [1]) | |
} | |
func testErrorThenRetry() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.errorThenSuccessLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(50, [1, 2, 3])]) | |
XCTAssertEqual(error.events.map { $0.time }, [30]) | |
XCTAssertEqual(dataLoader.pages, [1, 2, 1]) | |
} | |
} | |
class DataLoader { | |
private (set) var pages: [Int] = [] | |
private let testScheduler: TestScheduler | |
init(testScheduler: TestScheduler) { | |
self.testScheduler = testScheduler | |
} | |
func loadEmpty(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
return self.testScheduler.createColdObservable([.next(10, []), .completed(10)]) | |
.asObservable() | |
} | |
func loadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let result = Array((0..<3).map { page * 3 - (2 - $0) }) | |
return self.testScheduler.createColdObservable([.next(10, result), .completed(10)]) | |
.asObservable() | |
} | |
func reverseLoadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let result = Array((0..<3).map { page * 3 - (2 - $0) }) | |
let time = 15 - page * 5 | |
return self.testScheduler.createColdObservable([.next(time, result), .completed(time)]) | |
.asObservable() | |
} | |
func errorLoadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let error = NSError(domain: "testing", code: -1, userInfo: nil) | |
return self.testScheduler.createColdObservable([Recorded<Event<[Int]>>.error(10, error)]) | |
.asObservable() | |
} | |
func loadPageThenEmpty(page: Int) -> Observable<[Int]> { | |
if pages.count == 0 { | |
return loadData(page: page) | |
} | |
else { | |
return loadEmpty(page: page) | |
} | |
} | |
func errorThenSuccessLoadData(page: Int) -> Observable<[Int]> { | |
if pages.count == 1 { | |
return errorLoadData(page: page) | |
} | |
else { | |
return loadData(page: page) | |
} | |
} | |
} |
@alielsokary Try something like:
func getNextData(service: Service, category: String) -> (_ page: Int) -> Observable<[TheData]> {
return { page in
service.getNextData(category: category, page: page)
}
}
The function above will return a function you can pass to the PaginationSink.
let sink = PaginationSink(ui: source, loadData: getNextData(service: service, category: "home"))
Hi Daniel,
Thank you for your quick response. are you sure service.getNextData(section: section, page: page)
is correct?. because page
is not a parameter in getNextData and when I tried this in my code I got an error Extra argument 'page' in the call. also the
getNextData(section: section, page: page)expects argument
service`
@alielsokary I don't know. That's your function. I see that I used section
where your function has category
so I changed that in the above. The idea here is to make a factory function that takes the service and category as parameters and returns a function that takes the page number as a parameter, then you can use all three parameters to call your function.
It worked!
Thank you so much Daniel for your quick response and support :)
Would this work for a pagination with cursors?
@danielt1263 thanks again for a great sink! any chance of this being made into a pod?
I'm having trouble where by the pagination.isLoading
stops emitting events.
I've simplified the reproducible state code as example:
let pagination = PaginationSink<Show>(ui: source, loadData: { [unowned showsFilter] page in
if let filter = showsFilter.value, filter.isEmpty == false {
return Single<[Show]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated).async {
single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
}
return Disposables.create()
})
.asObservable()
} else {
return Single<[Show]>
.create(subscribe: { (single) -> Disposable in
single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
return Disposables.create()
})
.asObservable()
}
})
After returning filter Single, no 'loading' events are ever emitted again regardless of filter value.
What resolves the above chunk of code is starting with empty array .asObservable().startWith([])
I've tried to create a test to reflect my issue, but not so familiar with RxTest
framework to produce appropriate events.
Best I can do for now:
func testLoadingEventsError() {
let showsFilter = BehaviorRelay<String?>(value: nil)
let refreshRelay = PublishRelay<Void>.init()
let refreshTrigger = Observable.merge(showsFilter.skip(1).map({ _ in }), refreshRelay.asObservable())
let loadNextPageTrigger = Observable<Void>.empty()
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let expectationOne = XCTestExpectation(description: "Fetch filter")
let expectationTwo = XCTestExpectation(description: "Fetch reload")
let sink = PaginationSink(ui: source) { [unowned showsFilter] (page) -> Observable<[Int]> in
if let filter = showsFilter.value, filter.isEmpty == false {
return Single<[Int]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated)
.asyncAfter(deadline: .now() + 5, execute: {
expectationOne.fulfill()
single(.success(filter.map({ Int(String($0))! }) ))
})
return Disposables.create()
})
.asObservable()
} else {
return Single<[Int]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated)
.asyncAfter(deadline: .now() + 5, execute: {
expectationTwo.fulfill()
single(.success([]))
})
return Disposables.create()
})
.asObservable()
}
}
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
showsFilter.accept("123")
refreshRelay.accept(())
wait(for: [expectationOne, expectationTwo], timeout: 15.0)
XCTAssertEqual(isLoading.events, [.next(0, true), .next(0, false), .next(0, true), .next(0, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(0, [1, 2, 3]), .next(0, [1, 2, 3])])
}
Hi Daniel,
First, I want to personally thank you for this approach.
I have a question.
I'm trying to use the paginationSink like so:
Where
getNextData
isfunc getNextData(page: Int) -> Observable<[TheData]>
This works fine but I want to add a parameter to
getNextData
like sogetNextData(category: String, page: Int) -> Observable<[TheData]>
and use it like so:
let sink =
PaginationSink(ui: source, loadData: service.getNextData(section: "home", page:1))
I get
Cannot convert value of type 'Observable<[TheData]>' to expected argument type '(Int) -> Observable<[_]>'
I understand the error is because of the
PaginationSink
initloadData:
expects a closure of(Int) -> Observable<[T]>
I know that the solution for this maybe something silly but I'm not sure what is next step here.
Thank you