-
-
Save danielt1263/10bc5eb821c752ad45f281c6f4e3034b to your computer and use it in GitHub Desktop.
// | |
// PaginationNetworkLogic.swift | |
// | |
// Created by Daniel Tartaglia on 4/9/17. | |
// Copyright © 2019 Daniel Tartaglia. MIT License | |
// | |
import RxSwift | |
struct PaginationUISource { | |
/// reloads first page and dumps all other cached pages. | |
let refresh: Observable<Void> | |
/// loads next page | |
let loadNextPage: Observable<Void> | |
} | |
struct PaginationSink<T> { | |
/// true if network loading is in progress. | |
let isLoading: Observable<Bool> | |
/// elements from all loaded pages | |
let elements: Observable<[T]> | |
/// fires once for each error | |
let error: Observable<Error> | |
} | |
extension PaginationSink { | |
init(ui: PaginationUISource, loadData: @escaping (Int) -> Observable<[T]>) | |
{ | |
let loadResults = BehaviorSubject<[Int: [T]]>(value: [:]) | |
let maxPage = loadResults | |
.map { $0.keys } | |
.map { $0.max() ?? 1 } | |
let reload = ui.refresh | |
.map { -1 } | |
let loadNext = ui.loadNextPage | |
.withLatestFrom(maxPage) | |
.map { $0 + 1 } | |
let start = Observable.merge(reload, loadNext, Observable.just(1)) | |
let page = start | |
.flatMap { page in | |
Observable.combineLatest(Observable.just(page), loadData(page == -1 ? 1 : page)) { (pageNumber: $0, items: $1) } | |
.materialize() | |
.filter { $0.isCompleted == false } | |
} | |
.share() | |
_ = page | |
.compactMap { $0.element } | |
.withLatestFrom(loadResults) { (pages: $1, newPage: $0) } | |
.filter { $0.newPage.pageNumber == -1 || !$0.newPage.items.isEmpty } | |
.map { $0.newPage.pageNumber == -1 ? [1: $0.newPage.items] : $0.pages.merging([$0.newPage], uniquingKeysWith: { $1 }) } | |
.subscribe(loadResults) | |
let _isLoading = Observable.merge(start.map { _ in 1 }, page.map { _ in -1 }) | |
.scan(0, accumulator: +) | |
.map { $0 > 0 } | |
.distinctUntilChanged() | |
let _elements = loadResults | |
.map { $0.sorted(by: { $0.key < $1.key }).flatMap { $0.value } } | |
let _error = page | |
.map { $0.error } | |
.filter { $0 != nil } | |
.map { $0! } | |
isLoading = _isLoading | |
elements = _elements | |
error = _error | |
} | |
} |
class PaginationTests: XCTestCase { | |
var testScheduler: TestScheduler! | |
var bag: DisposeBag! | |
var isLoading: TestableObserver<Bool>! | |
var elements: TestableObserver<[Int]>! | |
var error: TestableObserver<Error>! | |
var dataLoader: DataLoader! | |
override func setUp() { | |
super.setUp() | |
testScheduler = TestScheduler(initialClock: 0) | |
bag = DisposeBag() | |
isLoading = testScheduler.createObserver(Bool.self) | |
elements = testScheduler.createObserver([Int].self) | |
error = testScheduler.createObserver(Error.self) | |
dataLoader = DataLoader(testScheduler: testScheduler) | |
} | |
func testDefault() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>.completed(30)]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
let foundErrors = testScheduler.createObserver(Bool.self) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.map { _ in true }.subscribe(foundErrors) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .completed(30)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .completed(30)]) | |
XCTAssertEqual(foundErrors.events, [.completed(30)]) | |
XCTAssertEqual(dataLoader.pages, [1]) | |
} | |
func testRefresh() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 1]) | |
} | |
func testRefreshNewLoadEmpty() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadPageThenEmpty(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 1]) | |
} | |
func testNextPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testNextPageThenRefresh() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(30, [1, 2, 3, 4, 5, 6]), .next(50, [1, 2, 3])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2, 1]) | |
} | |
func testNextPageBeforeInitialPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(5, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.loadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(15, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(15, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testNextPageComesBeforeInitialPage() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(3, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.reverseLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(8, [4, 5, 6]), .next(10, [1, 2, 3, 4, 5, 6])]) | |
XCTAssertTrue(error.events.isEmpty) | |
XCTAssertEqual(dataLoader.pages, [1, 2]) | |
} | |
func testLoadError() { | |
let refreshTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let loadNextPageTrigger = testScheduler.createColdObservable([Recorded<Event<Void>>]()) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.errorLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]())]) | |
XCTAssertEqual(error.events.count, 1) | |
XCTAssertEqual(dataLoader.pages, [1]) | |
} | |
func testErrorThenRetry() { | |
let refreshTrigger = testScheduler.createColdObservable([.next(40, ())]) | |
let loadNextPageTrigger = testScheduler.createColdObservable([.next(20, ())]) | |
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable()) | |
let sink = PaginationSink(ui: source, loadData: dataLoader.errorThenSuccessLoadData(page:)) | |
bag.insert( | |
sink.isLoading.subscribe(isLoading), | |
sink.elements.subscribe(elements), | |
sink.error.subscribe(error) | |
) | |
testScheduler.start() | |
XCTAssertEqual(isLoading.events, [.next(0, true), .next(10, false), .next(20, true), .next(30, false), .next(40, true), .next(50, false)]) | |
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(10, [1, 2, 3]), .next(50, [1, 2, 3])]) | |
XCTAssertEqual(error.events.map { $0.time }, [30]) | |
XCTAssertEqual(dataLoader.pages, [1, 2, 1]) | |
} | |
} | |
class DataLoader { | |
private (set) var pages: [Int] = [] | |
private let testScheduler: TestScheduler | |
init(testScheduler: TestScheduler) { | |
self.testScheduler = testScheduler | |
} | |
func loadEmpty(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
return self.testScheduler.createColdObservable([.next(10, []), .completed(10)]) | |
.asObservable() | |
} | |
func loadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let result = Array((0..<3).map { page * 3 - (2 - $0) }) | |
return self.testScheduler.createColdObservable([.next(10, result), .completed(10)]) | |
.asObservable() | |
} | |
func reverseLoadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let result = Array((0..<3).map { page * 3 - (2 - $0) }) | |
let time = 15 - page * 5 | |
return self.testScheduler.createColdObservable([.next(time, result), .completed(time)]) | |
.asObservable() | |
} | |
func errorLoadData(page: Int) -> Observable<[Int]> { | |
pages.append(page) | |
let error = NSError(domain: "testing", code: -1, userInfo: nil) | |
return self.testScheduler.createColdObservable([Recorded<Event<[Int]>>.error(10, error)]) | |
.asObservable() | |
} | |
func loadPageThenEmpty(page: Int) -> Observable<[Int]> { | |
if pages.count == 0 { | |
return loadData(page: page) | |
} | |
else { | |
return loadEmpty(page: page) | |
} | |
} | |
func errorThenSuccessLoadData(page: Int) -> Observable<[Int]> { | |
if pages.count == 1 { | |
return errorLoadData(page: page) | |
} | |
else { | |
return loadData(page: page) | |
} | |
} | |
} |
I created a view controller to test your gist with reqres public api, i just wanted to call their api without paging to make sure the gist is calling the first page correctly, but it emits an empty array first and then calls the service, here is what I've tried:
(1) I changed your pagination source UI struct to protocol so that I can re-use it easily in any view controller:
protocol PaginationUISource: class {
/// reloads first page and dumps all other cached pages.
var refresh: Observable<Void> {get}
/// loads next page
var loadNextPage: Observable<Void> {get}
}
(2) I used the following two methods in my view controller:
func getData(forPage: Int) -> Observable<[UserModel]> {
let request = APIRequest() //here i disregard the page to test only a regular call
let ob: Observable<[UserModel]> = self.apiCalling.send(apiRequest: request)
return ob
}
@IBAction func getData (_ sender: Any) {
let sink = PaginationSink(ui: self, loadData: getData(forPage:))
sink
.elements
.debug("ELEMENTS")
.subscribe{ _ in}
.disposed(by: disposeBag)
sink
.isLoading
.debug("LOADING")
.subscribe { _ in }
.disposed(by: disposeBag)
}
output:
ELEMENTS -> subscribed
ELEMENTS -> Event next([]) <==== this empty array is emitted due to using a BehaviorSubject for loadResults
LOADING -> subscribed
LOADING -> Event next(true)
ELEMENTS -> Event next([ .... Array of data ... ])
LOADING -> Event next(false)
two concerns here:
(1) using a BehaviorSubject
for loadResults var will emit empty array in the output before calling the service, I've tried to replace it with a PublishSubject
but no elements are emitted because PublishSubject
won't keep the last value, I don't have a clue on how to get rid of this empty array. Giving that I won't use skip(1)
in the view controller to skip this empty array because this is not a clean solution.
(2) is there a way to trigger calling the API using a Void observer (trigger) instead of calling it at init ? like a button.rx.tap
trigger.
There is no need to change the PaginationUISource to a protocol. All you need to do is create an instance of PaginationUISource and pass it in. If you like doing the extra work to conform to the protocol, you are welcome to do it, but it is not necessary. Just like it's not necessary to turn Int
into a protocol in order to pass an integer into a function.
Regarding your concerns:
(1) You can remove the initial empty array emission by adding .skip(1)
between lines 65 and 66.
(2) Yes there's a way to avoid calling the api service on init... Remove the Observable.just(1)
bit from the merge on line 43.
Thanks Daniel, I got a tiny improvement: replace lines 70-71 with compactMap{ $0 }
.
at line 58, you have to dispose the subscription, why did you just assign it to an underscore ?
Anyway, If you want to make your pagination sink perfect, there is a scenario in which the isLoading
observable emits wrong value, that happens when the loadData
function returns immediate result without having to wait a period of time, say when retrieving data from a cached URLSession
the result returns immediately as opposed to a network call, isLoading
at first call emits true
and doesn't emit a value for subsequent calls. Adding something like: .delay(.milliseconds(1), scheduler: MainScheduler.instance)
for the URLSession
's observable will make isLoading
works as expected. I don't have a clue on how to change lines 60-63 to handle this scenario, but it's not a big issue as it's a tiny bug that won't happen in most scenarios.
what's the best way to return the current page number to the subscriber ?
@mesheilah I think if I actually wanted individual pages, I would have the let elements: Observable<[T]>
be a let elements: Observable<[[T]]>
instead and return each page as a separate sub-array. The number of pages would be elements.map { $0.count }
.
I meant by current page is the page that is currently loading, I thought it would be better to use isLoading as Observable<(Bool, Int)> to hold the current page number being loaded. but can't figure out where to glue the page number with isLoading.
The following is a Combine
version I've written for this gist today:
import Foundation
import Combine
struct PaginationUISource {
/// reloads first page and dumps all other cached pages.
let refresh: AnyPublisher<Void, Error>
/// loads next page
let loadNextPage: AnyPublisher<Void, Error>
}
struct PaginationSink<T> {
fileprivate var subscriptions = Set<AnyCancellable>()
/// true if network loading is in progress.
let isLoading: AnyPublisher<Bool, Error>
/// elements from all loaded pages
let elements: AnyPublisher<[T], Error>
}
extension PaginationSink {
init(ui: PaginationUISource, loadData: @escaping (Int) -> AnyPublisher<[T], Error>) {
let loadResults = CurrentValueSubject<[Int: [T]], Error>([:])
let maxPage = loadResults
.map { $0.keys }
.map{ $0.max() ?? 1 }
let reload = ui.refresh
.map{ -1 }
let loadNext = ui.loadNextPage
.withLatestFrom(maxPage)
.map { $0 + 1 }
let start = Publishers.Merge(reload, loadNext)
let page = start
.flatMap { page in
Just(page)
.setFailureType(to: Error.self)
.combineLatest(loadData(page == -1 ? 1 : page)) { (pageNumber: $0, items: $1) }
}
.share()
page
.filter { !$0.items.isEmpty }
.withLatestFrom(loadResults) { (pages: $1, newPage: $0) }
.map { $0.newPage.pageNumber == -1 ? [1: $0.newPage.items] : $0.pages.merging([$0.newPage], uniquingKeysWith: { $1 }) }
.print("LoadResults") // <--- at this point loadResults can't get filled with pages
.subscribe(loadResults)
.store(in: &subscriptions)
let _isLoading = Publishers.Merge(start.map { _ in 1 }, page.map { _ in -1 })
.scan(0, +)
.map { $0 > 0 }
.removeDuplicates()
let _elements = loadResults
.map { $0.sorted(by: { $0.key < $1.key }).flatMap { $0.value } }
isLoading = _isLoading.eraseToAnyPublisher()
elements = _elements.eraseToAnyPublisher()
}
}
I have used Shai's gist to use withLatestFrom
custom operator with Combine
which's supposed to be working fine after testing it with multiple scenarios. However, the issue I'm facing is that loadResults
subject doesn't get filled with new pages, I have placed a print("LoadResults")
operator as appears in my code above and the console prints the following output:
LoadResults: receive subscription: ((extension in TestSwiftUI):Combine.Publishers.WithLatestFrom<Combine.Publishers.Filter<Comb ... blah blah blah
LoadResults: request unlimited
as you see above, there is no LoadResults: receive value
entry in the log, which means that loadResults
subject is not filled with pages.
what's causing that weird issue ?
Regarding Combine version above, it has no issues at all, the issue I was facing was because the refreshSubject
didn't call send
in the consumer class (or the view model) to trigger fetching the first page.
Hi Daniel,
First, I want to personally thank you for this approach.
I have a question.
I'm trying to use the paginationSink like so:
let source = PaginationUISource(refresh: _refreshTrigger, loadNextPage: _nextPageTrigger)
let sink = PaginationSink(ui: source, loadData: service.getNextData(page:))
Where getNextData
is
func getNextData(page: Int) -> Observable<[TheData]>
This works fine but I want to add a parameter to getNextData
like so getNextData(category: String, page: Int) -> Observable<[TheData]>
and use it like so:
let sink = PaginationSink(ui: source, loadData: service.getNextData(section: "home", page:1))
I get Cannot convert value of type 'Observable<[TheData]>' to expected argument type '(Int) -> Observable<[_]>'
I understand the error is because of the PaginationSink
init loadData:
expects a closure of (Int) -> Observable<[T]>
I know that the solution for this maybe something silly but I'm not sure what is next step here.
Thank you
@alielsokary Try something like:
func getNextData(service: Service, category: String) -> (_ page: Int) -> Observable<[TheData]> {
return { page in
service.getNextData(category: category, page: page)
}
}
The function above will return a function you can pass to the PaginationSink.
let sink = PaginationSink(ui: source, loadData: getNextData(service: service, category: "home"))
Hi Daniel,
Thank you for your quick response. are you sure service.getNextData(section: section, page: page)
is correct?. because page
is not a parameter in getNextData and when I tried this in my code I got an error Extra argument 'page' in the call. also the
getNextData(section: section, page: page)expects argument
service`
@alielsokary I don't know. That's your function. I see that I used section
where your function has category
so I changed that in the above. The idea here is to make a factory function that takes the service and category as parameters and returns a function that takes the page number as a parameter, then you can use all three parameters to call your function.
It worked!
Thank you so much Daniel for your quick response and support :)
Would this work for a pagination with cursors?
@danielt1263 thanks again for a great sink! any chance of this being made into a pod?
I'm having trouble where by the pagination.isLoading
stops emitting events.
I've simplified the reproducible state code as example:
let pagination = PaginationSink<Show>(ui: source, loadData: { [unowned showsFilter] page in
if let filter = showsFilter.value, filter.isEmpty == false {
return Single<[Show]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated).async {
single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
}
return Disposables.create()
})
.asObservable()
} else {
return Single<[Show]>
.create(subscribe: { (single) -> Disposable in
single(.success([.init(title: "\(page) - \(showsFilter.value ?? "no filter")")]))
return Disposables.create()
})
.asObservable()
}
})
After returning filter Single, no 'loading' events are ever emitted again regardless of filter value.
What resolves the above chunk of code is starting with empty array .asObservable().startWith([])
I've tried to create a test to reflect my issue, but not so familiar with RxTest
framework to produce appropriate events.
Best I can do for now:
func testLoadingEventsError() {
let showsFilter = BehaviorRelay<String?>(value: nil)
let refreshRelay = PublishRelay<Void>.init()
let refreshTrigger = Observable.merge(showsFilter.skip(1).map({ _ in }), refreshRelay.asObservable())
let loadNextPageTrigger = Observable<Void>.empty()
let source = PaginationUISource(refresh: refreshTrigger.asObservable(), loadNextPage: loadNextPageTrigger.asObservable())
let expectationOne = XCTestExpectation(description: "Fetch filter")
let expectationTwo = XCTestExpectation(description: "Fetch reload")
let sink = PaginationSink(ui: source) { [unowned showsFilter] (page) -> Observable<[Int]> in
if let filter = showsFilter.value, filter.isEmpty == false {
return Single<[Int]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated)
.asyncAfter(deadline: .now() + 5, execute: {
expectationOne.fulfill()
single(.success(filter.map({ Int(String($0))! }) ))
})
return Disposables.create()
})
.asObservable()
} else {
return Single<[Int]>
.create(subscribe: { (single) -> Disposable in
DispatchQueue.global(qos: .userInitiated)
.asyncAfter(deadline: .now() + 5, execute: {
expectationTwo.fulfill()
single(.success([]))
})
return Disposables.create()
})
.asObservable()
}
}
bag.insert(
sink.isLoading.subscribe(isLoading),
sink.elements.subscribe(elements),
sink.error.subscribe(error)
)
showsFilter.accept("123")
refreshRelay.accept(())
wait(for: [expectationOne, expectationTwo], timeout: 15.0)
XCTAssertEqual(isLoading.events, [.next(0, true), .next(0, false), .next(0, true), .next(0, false)])
XCTAssertEqual(elements.events, [.next(0, [Int]()), .next(0, [1, 2, 3]), .next(0, [1, 2, 3])])
}
Thanks for this snippet 🙂 (had an issue with it but I found it came from my implementation :P)