Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save dsxsxsxs/971ebb00a09404896ec20df75bb1b0de to your computer and use it in GitHub Desktop.
Save dsxsxsxs/971ebb00a09404896ec20df75bb1b0de to your computer and use it in GitHub Desktop.
In.comparision.to.RxSwift.flatMapFirst.alternatives.for.combine.swift
// Created by dsxsxsxs on 2020/09/15.
// Copyright © 2020 me.dsxsxsxs.rx.playground. All rights reserved.
//
import RxSwift
import RxRelay
import RxTest
import XCTest
import Combine
@testable import RxPlayground
class FlatMapTest: XCTestCase {
struct EventAndTimestamp: Equatable {
let time: Int
let event: Int
}
func testRxFlatMapFirstWithTrigger() {
func delay5<T>(_ t: T) -> Single<T> {
Single.just(t)
.delay(.seconds(5), scheduler: scheduler)
}
let scheduler = TestScheduler(initialClock: 0)
let trigger = PublishRelay<Int>()
let first = trigger.flatMapFirst { delay5($0) }
let resultObserver = scheduler.createObserver(Int.self)
_ = first.bind(to: resultObserver)
scheduler.scheduleAt(0, action: { trigger.accept(0) })
scheduler.scheduleAt(1, action: { trigger.accept(1) })
scheduler.scheduleAt(2, action: { trigger.accept(2) })
scheduler.scheduleAt(8, action: { trigger.accept(3) })
scheduler.start()
XCTAssertEqual(resultObserver.events, [
.next(5, 0),
.next(13, 3)
])
}
func testCombineFlatMapMax1WithTrigger() {
let scheduler = DispatchQueue(label: "abcd")
func delay5<T>(_ t: T) -> AnyPublisher<T, Never> {
Just(t)
.delay(for: .seconds(5), scheduler: scheduler)
.eraseToAnyPublisher()
}
let expectation = XCTestExpectation()
let trigger = PassthroughSubject<Int, Never>()
let now = DispatchTime.now()
var cancellables: [AnyCancellable] = []
var collectedEventAndTimestamp: [EventAndTimestamp] = []
let first = trigger
.flatMap(maxPublishers: .max(1)) { delay5($0) }
first.sink(receiveCompletion: { _ in
}) {
collectedEventAndTimestamp.append(.init(time: Int((DispatchTime.now().rawValue - now.rawValue) / 1000000000), event: $0))
}
.store(in: &cancellables)
scheduler.asyncAfter(deadline: now) { trigger.send(0) }
scheduler.asyncAfter(deadline: now + .seconds(1)) { trigger.send(1) }
scheduler.asyncAfter(deadline: now + .seconds(2)) { trigger.send(2) }
scheduler.asyncAfter(deadline: now + .seconds(8)) { trigger.send(3) }
scheduler.asyncAfter(deadline: now + .seconds(15)) {
XCTAssertEqual(collectedEventAndTimestamp, [
.init(time: 5, event: 0),
.init(time: 13, event: 3)
])
expectation.fulfill()
}
wait(for: [expectation], timeout: 100)
}
func testRxFlatMapFirstWithInterval() {
func delay5<T>(_ t: T) -> Single<T> {
Single.just(t)
.delay(.seconds(5), scheduler: scheduler)
}
let scheduler = TestScheduler(initialClock: 0)
let first = Observable<Int>.interval(.seconds(1), scheduler: scheduler)
.take(10)
.flatMapFirst { delay5($0) }
let resultObserver = scheduler.createObserver(Int.self)
first.bind(to: resultObserver)
scheduler.start()
XCTAssertEqual(resultObserver.events,[
.next(6, 0),
.next(12, 6),
.completed(13)
])
}
func testCombineFlatMapMax1WithInterval() {
let scheduler = DispatchQueue(label: "abcd")
func delay5<T>(_ t: T) -> AnyPublisher<T, Never> {
Just(t)
.delay(for: .seconds(5), scheduler: scheduler)
.eraseToAnyPublisher()
}
let expectation = XCTestExpectation()
let now = DispatchTime.now()
var cancellables: [AnyCancellable] = []
let timer = Timer.TimerPublisher(interval: 1, runLoop: .main, mode: .default)
let first = timer
.zip((0..<10).publisher)
.map { $1 }
.prefix(10)
.flatMap(maxPublishers: .max(1)) { delay5(Int($0)) }
var collectedEventAndTimestamp: [EventAndTimestamp] = []
timer.connect().store(in: &cancellables)
first.sink(receiveCompletion: { _ in
}) {
collectedEventAndTimestamp.append(.init(time: Int((DispatchTime.now().rawValue - now.rawValue) / 1000000000), event: $0))
}
.store(in: &cancellables)
scheduler.asyncAfter(deadline: now + .seconds(15)) {
XCTAssertEqual(collectedEventAndTimestamp, [
.init(time: 6, event: 0),
.init(time: 12, event: 1)
])
expectation.fulfill()
}
wait(for: [expectation], timeout: 50)
}
func testCombineFlatMapBufferWithTrigger() {
let scheduler = DispatchQueue(label: "abcd")
func delay5<T>(_ t: T) -> AnyPublisher<T, Never> {
Just(t)
.delay(for: .seconds(5), scheduler: scheduler)
.eraseToAnyPublisher()
}
let expectation = XCTestExpectation()
let trigger = PassthroughSubject<Int, Never>()
let now = DispatchTime.now()
var cancellables: [AnyCancellable] = []
var collectedEventAndTimestamp: [EventAndTimestamp] = []
let first = trigger
// buffer + flatMap不管用,做不出期待的效果
.buffer(size: 1, prefetch: .byRequest, whenFull: .dropNewest)
.flatMap { delay5($0) }
first.sink(receiveCompletion: { _ in
}) {
collectedEventAndTimestamp.append(.init(time: Int((DispatchTime.now().rawValue - now.rawValue) / 1000000000), event: $0))
}
.store(in: &cancellables)
scheduler.asyncAfter(deadline: now) { trigger.send(0) }
scheduler.asyncAfter(deadline: now + .seconds(1)) { trigger.send(1) }
scheduler.asyncAfter(deadline: now + .seconds(2)) { trigger.send(2) }
scheduler.asyncAfter(deadline: now + .seconds(8)) { trigger.send(3) }
scheduler.asyncAfter(deadline: now + .seconds(15)) {
// 這個test會fail
XCTAssertEqual(collectedEventAndTimestamp, [
.init(time: 5, event: 0),
.init(time: 13, event: 3)
])
expectation.fulfill()
}
wait(for: [expectation], timeout: 100)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment