Last active
April 18, 2022 19:41
-
-
Save danielt1263/10bc5eb821c752ad45f281c6f4e3034b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// PaginationNetworkLogic.swift | |
// | |
// Created by Daniel Tartaglia on 4/9/17. | |
// Copyright © 2019 Daniel Tartaglia. MIT License | |
// | |
import RxSwift | |
struct PaginationUISource { | |
/// reloads first page and dumps all other cached pages. | |
let refresh: Observable<Void> | |
/// loads next page | |
let loadNextPage: Observable<Void> | |
} | |
struct PaginationSink<T> { | |
/// true if network loading is in progress. | |
let isLoading: Observable<Bool> | |
/// elements from all loaded pages | |
let elements: Observable<[T]> | |
/// fires once for each error | |
let error: Observable<Error> | |
} | |
extension PaginationSink { | |
init(ui: PaginationUISource, loadData: @escaping (Int) -> Observable<[T]>) | |
{ | |
let loadResults = BehaviorSubject<[Int: [T]]>(value: [:]) | |
let maxPage = loadResults | |
.map { $0.keys } | |
.map { $0.max() ?? 1 } | |
let reload = ui.refresh | |
.map { -1 } | |
let loadNext = ui.loadNextPage | |
.withLatestFrom(maxPage) | |
.map { $0 + 1 } | |
let start = Observable.merge(reload, loadNext, Observable.just(1)) | |
let page = start | |
.flatMap { page in | |
Observable.combineLatest(Observable.just(page), loadData(page == -1 ? 1 : page)) { (pageNumber: $0, items: $1) } | |
.materialize() | |
.filter { $0.isCompleted == false } | |
} | |
.share() | |
_ = page | |
.compactMap { $0.element } | |
.withLatestFrom(loadResults) { (pages: $1, newPage: $0) } | |
.filter { $0.newPage.pageNumber == -1 || !$0.newPage.items.isEmpty } | |
.map { $0.newPage.pageNumber == -1 ? [1: $0.newPage.items] : $0.pages.merging([$0.newPage], uniquingKeysWith: { $1 }) } | |
.subscribe(loadResults) | |
let _isLoading = Observable.merge(start.map { _ in 1 }, page.map { _ in -1 }) | |
.scan(0, accumulator: +) | |
.map { $0 > 0 } | |
.distinctUntilChanged() | |
let _elements = loadResults | |
.map { $0.sorted(by: { $0.key < $1.key }).flatMap { $0.value } } | |
let _error = page | |
.map { $0.error } | |
.filter { $0 != nil } | |
.map { $0! } | |
isLoading = _isLoading | |
elements = _elements | |
error = _error | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PaginationTests: XCTestCase { | |
var testScheduler: TestScheduler! | |
var bag: DisposeBag! | |
var isLoading: TestableObserver<Bool>! | |
var elements: TestableObserver<[Int]>! | |
var error: TestableObserver<Error>! | |
var dataLoader: DataLoader! | |
override func setUp() { | |
super.setUp() | |
testScheduler = TestScheduler(initialClock: 0) | |
bag = DisposeBag() | |
isLoading = testScheduler.createObserver(Bool.self) | |
elements = testScheduler.createObserver([Int].self) | |
error = testScheduler.createObserver(Error.self) | |
dataLoader = DataLoader(testScheduler: testScheduler) | |
} | |
func testDefault() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
let foundErrors = testScheduler.createObserver(Bool.self) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.map { _ in true }.subscribe(foundErrors) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .completed(30)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .completed(30)]) | |
XCTAssertEqual(foundErrors.events, [.completed(30)]) | |
XCTAssertEqual(dataLoader.pages, [1]) | |
} | |
func testRefresh() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 1]) | |
} | |
func testRefreshNewLoadEmpty() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadPageThenEmpty(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 1]) | |
} | |
func testNextPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testNextPageThenRefresh() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6]), .next(50, [1, 2, 3])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2, 1]) | |
} | |
func testNextPageBeforeInitialPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(5, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(15, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(15, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testNextPageComesBeforeInitialPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(3, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.reverseLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(8, [4, 5, 6]), .next(10, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testLoadError() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.errorLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]())]) | |
XCTAssertEqual(error.events.count, 1) | |
XCTAssertEqual(dataLoader.pages, [1]) | |
} | |
func testErrorThenRetry() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.errorThenSuccessLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(50, [1, 2, 3])]) | |
XCTAssertEqual(error.events.map { $0.time }, [30]) | |
XCTAssertEqual(dataLoader.pages, [1, 2, 1]) | |
} | |
} | |
class DataLoader { | |
private (set) var pages: [Int] = [] | |
private let testScheduler: TestScheduler | |
init(testScheduler: TestScheduler) { | |
self.testScheduler = testScheduler | |
} | |
func loadEmpty(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
return self.testScheduler.createColdObservable([.next(10, []), .completed(10)]) | |
.asObservable() | |
} | |
func loadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let result = Array((0..<3).map { page * 3 - (2 - $0) }) | |
return self.testScheduler.createColdObservable([.next(10, result), .completed(10)]) | |
.asObservable() | |
} | |
func reverseLoadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let result = Array((0..<3).map { page * 3 - (2 - $0) }) | |
let time = 15 - page * 5 | |
return self.testScheduler.createColdObservable([.next(time, result), .completed(time)]) | |
.asObservable() | |
} | |
func errorLoadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let error = NSError(domain: "testing", code: -1, userInfo: nil) | |
return self.testScheduler.createColdObservable([Recorded<Event<[Int]>>.error(10, error)]) | |
.asObservable() | |
} | |
func loadPageThenEmpty(page: Int) -> Observable<[Int]> { | |
if pages.count == 0 { | |
return loadData(page: page) | |
} | |
else { | |
return loadEmpty(page: page) | |
} | |
} | |
func errorThenSuccessLoadData(page: Int) -> Observable<[Int]> { | |
if pages.count == 1 { | |
return errorLoadData(page: page) | |
} | |
else { | |
return loadData(page: page) | |
} | |
} | |
} |
Would this work for a pagination with cursors?
@danielt1263 thanks again for a great sink! any chance of this being made into a pod?
I'm having trouble where by the pagination.isLoading
stops emitting events.
I've simplified the reproducible state code as example:
let pagination = PaginationSink<Show>(ui: source, loadData: { [unowned showsFilter] page in
if let filter = showsFilter.value, filter.isEmpty == false {
return Single<[Show]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated).async {
single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
}
return Disposables.create()
})
.asObservable()
} else {
return Single<[Show]>
.create(subscribe: { (single) -> Disposable in
single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
return Disposables.create()
})
.asObservable()
}
})
After returning filter Single, no 'loading' events are ever emitted again regardless of filter value.
What resolves the above chunk of code is starting with empty array .asObservable().startWith([])
I've tried to create a test to reflect my issue, but not so familiar with RxTest
framework to produce appropriate events.
Best I can do for now:
func testLoadingEventsError() {
let showsFilter = BehaviorRelay<String?>(value: nil)
let refreshRelay = PublishRelay<Void>.init()
let refreshTrigger = Observable.merge(showsFilter.skip(1).map({ _ in }), refreshRelay.asObservable())
let loadNextPageTrigger = Observable<Void>.empty()
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let expectationOne = XCTestExpectation(description: "Fetch filter")
let expectationTwo = XCTestExpectation(description: "Fetch reload")
let sink = PaginationSink(ui: source) { [unowned showsFilter] (page) -> Observable<[Int]> in
if let filter = showsFilter.value, filter.isEmpty == false {
return Single<[Int]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated)
.asyncAfter(deadline: .now() + 5, execute: {
expectationOne.fulfill()
single(.success(filter.map({ Int(String($0))! }) ))
})
return Disposables.create()
})
.asObservable()
} else {
return Single<[Int]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated)
.asyncAfter(deadline: .now() + 5, execute: {
expectationTwo.fulfill()
single(.success([]))
})
return Disposables.create()
})
.asObservable()
}
}
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
showsFilter.accept("123")
refreshRelay.accept(())
wait(for: [expectationOne, expectationTwo], timeout: 15.0)
XCTAssertEqual(isLoading.events, [.next(0, true), .next(0, false), .next(0, true), .next(0, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(0, [1, 2, 3]), .next(0, [1, 2, 3])])
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It worked!
Thank you so much Daniel for your quick response and support :)