-
-
Save danielt1263/10bc5eb821c752ad45f281c6f4e3034b to your computer and use it in GitHub Desktop.
// | |
// PaginationNetworkLogic.swift | |
// | |
// Created by Daniel Tartaglia on 4/9/17. | |
// Copyright © 2019 Daniel Tartaglia. MIT License | |
// | |
import RxSwift | |
struct PaginationUISource { | |
/// reloads first page and dumps all other cached pages. | |
let refresh: Observable<Void> | |
/// loads next page | |
let loadNextPage: Observable<Void> | |
} | |
struct PaginationSink<T> { | |
/// true if network loading is in progress. | |
let isLoading: Observable<Bool> | |
/// elements from all loaded pages | |
let elements: Observable<[T]> | |
/// fires once for each error | |
let error: Observable<Error> | |
} | |
extension PaginationSink { | |
init(ui: PaginationUISource, loadData: @escaping (Int) -> Observable<[T]>) | |
{ | |
let loadResults = BehaviorSubject<[Int: [T]]>(value: [:]) | |
let maxPage = loadResults | |
.map { $0.keys } | |
.map { $0.max() ?? 1 } | |
let reload = ui.refresh | |
.map { -1 } | |
let loadNext = ui.loadNextPage | |
.withLatestFrom(maxPage) | |
.map { $0 + 1 } | |
let start = Observable.merge(reload, loadNext, Observable.just(1)) | |
let page = start | |
.flatMap { page in | |
Observable.combineLatest(Observable.just(page), loadData(page == -1 ? 1 : page)) { (pageNumber: $0, items: $1) } | |
.materialize() | |
.filter { $0.isCompleted == false } | |
} | |
.share() | |
_ = page | |
.compactMap { $0.element } | |
.withLatestFrom(loadResults) { (pages: $1, newPage: $0) } | |
.filter { $0.newPage.pageNumber == -1 || !$0.newPage.items.isEmpty } | |
.map { $0.newPage.pageNumber == -1 ? [1: $0.newPage.items] : $0.pages.merging([$0.newPage], uniquingKeysWith: { $1 }) } | |
.subscribe(loadResults) | |
let _isLoading = Observable.merge(start.map { _ in 1 }, page.map { _ in -1 }) | |
.scan(0, accumulator: +) | |
.map { $0 > 0 } | |
.distinctUntilChanged() | |
let _elements = loadResults | |
.map { $0.sorted(by: { $0.key < $1.key }).flatMap { $0.value } } | |
let _error = page | |
.map { $0.error } | |
.filter { $0 != nil } | |
.map { $0! } | |
isLoading = _isLoading | |
elements = _elements | |
error = _error | |
} | |
} |
class PaginationTests: XCTestCase { | |
var testScheduler: TestScheduler! | |
var bag: DisposeBag! | |
var isLoading: TestableObserver<Bool>! | |
var elements: TestableObserver<[Int]>! | |
var error: TestableObserver<Error>! | |
var dataLoader: DataLoader! | |
override func setUp() { | |
super.setUp() | |
testScheduler = TestScheduler(initialClock: 0) | |
bag = DisposeBag() | |
isLoading = testScheduler.createObserver(Bool.self) | |
elements = testScheduler.createObserver([Int].self) | |
error = testScheduler.createObserver(Error.self) | |
dataLoader = DataLoader(testScheduler: testScheduler) | |
} | |
func testDefault() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
let foundErrors = testScheduler.createObserver(Bool.self) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.map { _ in true }.subscribe(foundErrors) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .completed(30)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .completed(30)]) | |
XCTAssertEqual(foundErrors.events, [.completed(30)]) | |
XCTAssertEqual(dataLoader.pages, [1]) | |
} | |
func testRefresh() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 1]) | |
} | |
func testRefreshNewLoadEmpty() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadPageThenEmpty(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 1]) | |
} | |
func testNextPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testNextPageThenRefresh() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6]), .next(50, [1, 2, 3])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2, 1]) | |
} | |
func testNextPageBeforeInitialPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(5, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(15, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(15, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testNextPageComesBeforeInitialPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(3, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.reverseLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(8, [4, 5, 6]), .next(10, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testLoadError() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.errorLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]())]) | |
XCTAssertEqual(error.events.count, 1) | |
XCTAssertEqual(dataLoader.pages, [1]) | |
} | |
func testErrorThenRetry() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.errorThenSuccessLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(50, [1, 2, 3])]) | |
XCTAssertEqual(error.events.map { $0.time }, [30]) | |
XCTAssertEqual(dataLoader.pages, [1, 2, 1]) | |
} | |
} | |
class DataLoader { | |
private (set) var pages: [Int] = [] | |
private let testScheduler: TestScheduler | |
init(testScheduler: TestScheduler) { | |
self.testScheduler = testScheduler | |
} | |
func loadEmpty(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
return self.testScheduler.createColdObservable([.next(10, []), .completed(10)]) | |
.asObservable() | |
} | |
func loadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let result = Array((0..<3).map { page * 3 - (2 - $0) }) | |
return self.testScheduler.createColdObservable([.next(10, result), .completed(10)]) | |
.asObservable() | |
} | |
func reverseLoadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let result = Array((0..<3).map { page * 3 - (2 - $0) }) | |
let time = 15 - page * 5 | |
return self.testScheduler.createColdObservable([.next(time, result), .completed(time)]) | |
.asObservable() | |
} | |
func errorLoadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let error = NSError(domain: "testing", code: -1, userInfo: nil) | |
return self.testScheduler.createColdObservable([Recorded<Event<[Int]>>.error(10, error)]) | |
.asObservable() | |
} | |
func loadPageThenEmpty(page: Int) -> Observable<[Int]> { | |
if pages.count == 0 { | |
return loadData(page: page) | |
} | |
else { | |
return loadEmpty(page: page) | |
} | |
} | |
func errorThenSuccessLoadData(page: Int) -> Observable<[Int]> { | |
if pages.count == 1 { | |
return errorLoadData(page: page) | |
} | |
else { | |
return loadData(page: page) | |
} | |
} | |
} |
what's the best way to return the current page number to the subscriber ?
@mesheilah I think if I actually wanted individual pages, I would have the let elements: Observable<[T]>
be a let elements: Observable<[[T]]>
instead and return each page as a separate sub-array. The number of pages would be elements.map { $0.count }
.
I meant by current page is the page that is currently loading, I thought it would be better to use isLoading as Observable<(Bool, Int)> to hold the current page number being loaded. but can't figure out where to glue the page number with isLoading.
The following is a Combine
version I've written for this gist today:
import Foundation
import Combine
struct PaginationUISource {
/// reloads first page and dumps all other cached pages.
let refresh: AnyPublisher<Void, Error>
/// loads next page
let loadNextPage: AnyPublisher<Void, Error>
}
struct PaginationSink<T> {
fileprivate var subscriptions = Set<AnyCancellable>()
/// true if network loading is in progress.
let isLoading: AnyPublisher<Bool, Error>
/// elements from all loaded pages
let elements: AnyPublisher<[T], Error>
}
extension PaginationSink {
init(ui: PaginationUISource, loadData: @escaping (Int) -> AnyPublisher<[T], Error>) {
let loadResults = CurrentValueSubject<[Int: [T]], Error>([:])
let maxPage = loadResults
.map { $0.keys }
.map{ $0.max() ?? 1 }
let reload = ui.refresh
.map{ -1 }
let loadNext = ui.loadNextPage
.withLatestFrom(maxPage)
.map { $0 + 1 }
let start = Publishers.Merge(reload, loadNext)
let page = start
.flatMap { page in
Just(page)
.setFailureType(to: Error.self)
.combineLatest(loadData(page == -1 ? 1 : page)) { (pageNumber: $0, items: $1) }
}
.share()
page
.filter { !$0.items.isEmpty }
.withLatestFrom(loadResults) { (pages: $1, newPage: $0) }
.map { $0.newPage.pageNumber == -1 ? [1: $0.newPage.items] : $0.pages.merging([$0.newPage], uniquingKeysWith: { $1 }) }
.print("LoadResults") // <--- at this point loadResults can't get filled with pages
.subscribe(loadResults)
.store(in: &subscriptions)
let _isLoading = Publishers.Merge(start.map { _ in 1 }, page.map { _ in -1 })
.scan(0, +)
.map { $0 > 0 }
.removeDuplicates()
let _elements = loadResults
.map { $0.sorted(by: { $0.key < $1.key }).flatMap { $0.value } }
isLoading = _isLoading.eraseToAnyPublisher()
elements = _elements.eraseToAnyPublisher()
}
}
I have used Shai's gist to use withLatestFrom
custom operator with Combine
which's supposed to be working fine after testing it with multiple scenarios. However, the issue I'm facing is that loadResults
subject doesn't get filled with new pages, I have placed a print("LoadResults")
operator as appears in my code above and the console prints the following output:
LoadResults: receive subscription: ((extension in TestSwiftUI):Combine.Publishers.WithLatestFrom<Combine.Publishers.Filter<Comb ... blah blah blah
LoadResults: request unlimited
as you see above, there is no LoadResults: receive value
entry in the log, which means that loadResults
subject is not filled with pages.
what's causing that weird issue ?
Regarding Combine version above, it has no issues at all, the issue I was facing was because the refreshSubject
didn't call send
in the consumer class (or the view model) to trigger fetching the first page.
Hi Daniel,
First, I want to personally thank you for this approach.
I have a question.
I'm trying to use the paginationSink like so:
let source = PaginationUISource(refresh: _refreshTrigger, loadNextPage: _nextPageTrigger)
let sink = PaginationSink(ui: source, loadData: service.getNextData(page:))
Where getNextData
is
func getNextData(page: Int) -> Observable<[TheData]>
This works fine but I want to add a parameter to getNextData
like so getNextData(category: String, page: Int) -> Observable<[TheData]>
and use it like so:
let sink = PaginationSink(ui: source, loadData: service.getNextData(section: "home", page:1))
I get Cannot convert value of type 'Observable<[TheData]>' to expected argument type '(Int) -> Observable<[_]>'
I understand the error is because of the PaginationSink
init loadData:
expects a closure of (Int) -> Observable<[T]>
I know that the solution for this maybe something silly but I'm not sure what is next step here.
Thank you
@alielsokary Try something like:
func getNextData(service: Service, category: String) -> (_ page: Int) -> Observable<[TheData]> {
return { page in
service.getNextData(category: category, page: page)
}
}
The function above will return a function you can pass to the PaginationSink.
let sink = PaginationSink(ui: source, loadData: getNextData(service: service, category: "home"))
Hi Daniel,
Thank you for your quick response. are you sure service.getNextData(section: section, page: page)
is correct?. because page
is not a parameter in getNextData and when I tried this in my code I got an error Extra argument 'page' in the call. also the
getNextData(section: section, page: page)expects argument
service`
@alielsokary I don't know. That's your function. I see that I used section
where your function has category
so I changed that in the above. The idea here is to make a factory function that takes the service and category as parameters and returns a function that takes the page number as a parameter, then you can use all three parameters to call your function.
It worked!
Thank you so much Daniel for your quick response and support :)
Would this work for a pagination with cursors?
@danielt1263 thanks again for a great sink! any chance of this being made into a pod?
I'm having trouble where by the pagination.isLoading
stops emitting events.
I've simplified the reproducible state code as example:
let pagination = PaginationSink<Show>(ui: source, loadData: { [unowned showsFilter] page in
if let filter = showsFilter.value, filter.isEmpty == false {
return Single<[Show]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated).async {
single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
}
return Disposables.create()
})
.asObservable()
} else {
return Single<[Show]>
.create(subscribe: { (single) -> Disposable in
single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
return Disposables.create()
})
.asObservable()
}
})
After returning filter Single, no 'loading' events are ever emitted again regardless of filter value.
What resolves the above chunk of code is starting with empty array .asObservable().startWith([])
I've tried to create a test to reflect my issue, but not so familiar with RxTest
framework to produce appropriate events.
Best I can do for now:
func testLoadingEventsError() {
let showsFilter = BehaviorRelay<String?>(value: nil)
let refreshRelay = PublishRelay<Void>.init()
let refreshTrigger = Observable.merge(showsFilter.skip(1).map({ _ in }), refreshRelay.asObservable())
let loadNextPageTrigger = Observable<Void>.empty()
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let expectationOne = XCTestExpectation(description: "Fetch filter")
let expectationTwo = XCTestExpectation(description: "Fetch reload")
let sink = PaginationSink(ui: source) { [unowned showsFilter] (page) -> Observable<[Int]> in
if let filter = showsFilter.value, filter.isEmpty == false {
return Single<[Int]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated)
.asyncAfter(deadline: .now() + 5, execute: {
expectationOne.fulfill()
single(.success(filter.map({ Int(String($0))! }) ))
})
return Disposables.create()
})
.asObservable()
} else {
return Single<[Int]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated)
.asyncAfter(deadline: .now() + 5, execute: {
expectationTwo.fulfill()
single(.success([]))
})
return Disposables.create()
})
.asObservable()
}
}
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
showsFilter.accept("123")
refreshRelay.accept(())
wait(for: [expectationOne, expectationTwo], timeout: 15.0)
XCTAssertEqual(isLoading.events, [.next(0, true), .next(0, false), .next(0, true), .next(0, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(0, [1, 2, 3]), .next(0, [1, 2, 3])])
}
Thanks Daniel, I got a tiny improvement: replace lines 70-71 with
compactMap{ $0 }
.at line 58, you have to dispose the subscription, why did you just assign it to an underscore ?
Anyway, If you want to make your pagination sink perfect, there is a scenario in which the
isLoading
observable emits wrong value, that happens when theloadData
function returns immediate result without having to wait a period of time, say when retrieving data from a cachedURLSession
the result returns immediately as opposed to a network call,isLoading
at first call emitstrue
and doesn't emit a value for subsequent calls. Adding something like:.delay(.milliseconds(1), scheduler: MainScheduler.instance)
for theURLSession
's observable will makeisLoading
works as expected. I don't have a clue on how to change lines 60-63 to handle this scenario, but it's not a big issue as it's a tiny bug that won't happen in most scenarios.