Last active
March 19, 2020 05:23
-
-
Save popeyelau/4d352399079f2461390a331455299482 to your computer and use it in GitHub Desktop.
RxSwift Playground|-|{"files":{"RxSwift.playground":{"env":"plain"}},"tag":"Gists"}
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//: [Previous](@previous) | |
//http://blog.callmewhy.com/2015/05/11/functional-reactive-programming-1/ | |
//http://blog.callmewhy.com/2015/09/21/rxswift-getting-started-0/ | |
//http://blog.callmewhy.com/2015/09/23/rxswift-getting-started-1/ | |
import RxSwift | |
let disposeBag = DisposeBag() | |
//========================Observable======================== | |
// 创建一个不会发出任何何事的序列 | |
Observable<String>.never().subscribe({ | |
print($0) | |
}) | |
// 创建公一个元素的序列 onNext,onComplete | |
Observable<String>.just("hello").subscribe { | |
print($0) | |
} | |
// 使用固定数量元素合建序列 | |
Observable.of(1,2,3,4,5).subscribe { | |
print($0) | |
} | |
// 使用一个SequenceType对象创建一个序列 比如一个Array/Dictionary/Set... | |
Observable.from([1,2,3]).subscribe { | |
print($0) | |
} | |
// 创建自定义序列 | |
let custom = { (element: String) -> Observable<String> in | |
return Observable<String>.create({ (observer) -> Disposable in | |
observer.onNext(element) | |
observer.onCompleted() | |
return Disposables.create() | |
}) | |
} | |
custom("Hello world").subscribe { | |
print($0) | |
} | |
// 创建一个不确定次数的指定元素序列 | |
Observable.repeatElement("🅰️") | |
.take(3) //take操作返回一个从序列开始到指定数字的元素数组 | |
.subscribe { | |
print($0) | |
} | |
// 创建一个当条件为true时就生成值的序列 ( initial < 2 ? (onNext(initial) then initial + 1) : onComplete) | |
let initial = 0 | |
Observable.generate(initialState: initial, condition: { $0 < 2 }, iterate: {$0 + 1 }) | |
.subscribe { | |
print($0) | |
} | |
//deferred 会等到有订阅者的时候再通过工厂方法创建 Observable 对象,每个订阅者订阅的对象都是内容相同而完全独立的序列 | |
let deferred = Observable<String>.deferred { | |
return Observable<String>.create({ (observer) -> Disposable in | |
observer.onNext("A") | |
observer.onNext("B") | |
observer.onCompleted() | |
return Disposables.create() | |
}) | |
} | |
deferred.subscribe { | |
print($0) | |
} | |
deferred.subscribe { | |
print($0) | |
} | |
//为什么需要 defferd 这样一个奇怪的家伙呢?其实这相当于是一种延时加载,因为在添加监听的时候数据未必加载完毕 | |
//e.g. | |
var str: String? = nil | |
let strObserver: Observable<String?> = Observable.just(str) | |
let strDeferred: Observable<String?> = Observable.deferred{ | |
return Observable.just(str) | |
} | |
str = "hello world" | |
strObserver.subscribe { | |
print($0) | |
} | |
strDeferred.subscribe({ | |
print($0) | |
}) | |
// 为每一个事件调用附带动作 | |
Observable.of(1,2,3) | |
.do( | |
onNext: { print("do \($0)")} | |
) | |
.subscribe { | |
print($0) | |
} | |
//: [Next](@next) | |
//========================Subject======================== | |
/* | |
简单的比喻下,Observable像是一个水管,会源源不断的有水冒出来。Subject就像一个水龙头,它可以套在水管上,接受Observable上面的事件。但是作为水龙头,它下面还可以被别的observer给subscribe了。 | |
*/ | |
// PublishSubject它仅仅会发送observer订阅之后的事件,也就是说如果sequence上有.Next 的到来,但是这个时候某个observer还没有subscribe它,这个observer就收不到这条信息,它只会收到它订阅之后发生的事件 | |
let subject = PublishSubject<String>() | |
subject.onNext("🐶") | |
subject.onNext("🐱") | |
// 之前onNext会被忽略掉 | |
subject.subscribe(onNext: { (value) in | |
print(value) | |
}) | |
subject.onNext("🅰️") | |
subject.onNext("🅱️") | |
//ReplaySubject它和PublishSubject不同之处在于它不会漏消息。即使observer在subscribe的时候已经有事件发生过了,它也会收到之前的事件序列。 | |
//指定bufferSize数量的 在新订阅者订阅之前广播的 事件 发送到新订阅者上 | |
let replay = ReplaySubject<String>.create(bufferSize: 2) | |
replay.onNext("A") | |
replay.onNext("B") | |
replay.onNext("C") | |
replay.subscribe { | |
print($0) | |
} | |
replay.onNext("D") | |
// BehaviorSubject 当有observer在订阅一个BehaviorSubject的时候,它首先将会收到Observable上最近发送一个信号(或者是默认值),接着才会收到Observable上会发送的序列。 | |
let behavior = BehaviorSubject<Int>(value: 0) | |
behavior.onNext(1) | |
behavior.onNext(2) | |
behavior.onNext(3) // 订阅前最近发送的值 | |
behavior.subscribe { | |
print($0) | |
} | |
behavior.onNext(4) | |
// Variable是BehaviorSubject的封装,它和BehaviorSubject不同之处在于,不能向Variable发送.Complete和.Error,它会在生命周期结束被释放的时候自动发送.Complete。 | |
// 调用Variable实例的asObservable()方法是为了访问BehaviorSubject序列的底层。Variable不会实现on操作(或者 onNext(_:)),而是暴露一个可以获取当前值的属性。设置一个新值还会添加这个值到它的底层BehaviorSubject序列 | |
let magicVariable = Variable(12) | |
magicVariable.asObservable().skip(1).subscribe(onNext: { (value) in | |
print(value) | |
}) | |
// distinctUntilChange 过滤掉重复的序列值 | |
magicVariable.asObservable().distinctUntilChanged().subscribe(onNext: { (value) in | |
print("distinctUntilChanged: \(value)") | |
}) | |
magicVariable.value = 12 | |
magicVariable.value = 12 | |
magicVariable.value = 13 | |
//========================Operators======================== | |
//========================1.Combination======================== | |
//联合操作将多个源Observable结合成一个Observable | |
//startWith 体现了后进先出的原则,那就是说,每一个连续的startWith元素将会在优先的startWith元素之前被考虑 | |
Observable.of("1","2") | |
.startWith("0") | |
.subscribe { | |
print($0) | |
} | |
//merge 结合多个源Observable序列中的元素成为一个新的Observable序列,并且将会发出每一个元素,事实上是通过每个源Observable序列发出的 | |
let subject1 = PublishSubject<String>() | |
let subject2 = PublishSubject<String>() | |
let merged = Observable.of(subject1,subject2).merge() | |
merged.subscribe { | |
print($0) | |
} | |
subject1.onNext("A") | |
subject1.onNext("B") | |
subject2.onNext("🆎") | |
subject1.onNext("C") | |
subject2.onNext("D") | |
//zip 对应现实中的例子就是拉链,拉链需要两个元素这样才能拉上去,这里也一样,只有当两个 Observable 都有了新的值时,subscribe 才会被触发。 | |
let stringSubject = PublishSubject<String>() | |
let intSubject = PublishSubject<Int>() | |
Observable.zip(stringSubject, intSubject) { stringElement, intElement in | |
"\(stringElement) \(intElement)" | |
} | |
.subscribe(onNext: { print("zip: \($0)") }) | |
.addDisposableTo(disposeBag) | |
stringSubject.onNext("🅰️") | |
stringSubject.onNext("🅱️") | |
stringSubject.onNext("🆎") //只有当两个 Observable 都有了新的值时,subscribe 才会被触发 | |
intSubject.onNext(1) | |
intSubject.onNext(1) | |
//combineLatest | |
//http://reactivex.io/documentation/operators/combinelatest.html | |
//将源Observable组合成一个新的Observable序列 并且一旦所有源序列发出至少一个元素以及当任何源Observable序列发出一个新的元素的时候,将开始发散组合的Observable序列的最新的元素。 | |
let stringSubject1 = PublishSubject<String>() | |
let intSubject1 = PublishSubject<Int>() | |
Observable.combineLatest(stringSubject1, intSubject1) {(string, int) in "\(string)\(int)" } | |
.subscribe { | |
print("combineLatest: \($0)") | |
} | |
stringSubject1.onNext("🅰️") | |
stringSubject1.onNext("🅱️") | |
intSubject1.onNext(1) | |
intSubject1.onNext(2) | |
stringSubject1.onNext("🆎") | |
intSubject1.onNext(3) | |
intSubject1.onNext(4) | |
// e.g. 根据费用项目的变化,自动计算费用合计 | |
let charge1 = Variable<Double>(0.0) | |
let charge2 = Variable<Double>(0.0) | |
let charge3 = Variable<Double>(0.0) | |
let charge4 = Variable<Double>(0.0) | |
Observable.combineLatest(charge1.asObservable(), charge2.asObservable(), charge3.asObservable(), charge4.asObservable()) { (charge1, charge2, charge3, charge4) in | |
charge1 + charge2 + charge3 + charge4 | |
}.subscribe { | |
print("SUM: \($0)") | |
} | |
charge1.value = 2.2 | |
charge2.value = 2.8 | |
charge3.value = 5.0 | |
// switchLatest 通过一个在Observable序列队里的Observable序列发出转换元素,并且发出最近的Observable序列里的元素。 | |
let aSubject = BehaviorSubject(value: "A") | |
let bSubject = BehaviorSubject(value: "B") | |
let aVariable = Variable(aSubject) | |
aVariable.asObservable().switchLatest().subscribe { | |
print($0) | |
} | |
aSubject.onNext("B") | |
aSubject.onNext("C") | |
//这里将bSubject赋值给了aVariable, aSubject后续发送的事件都会被舍弃掉.并接收bSubject(最近)的事件 | |
aVariable.value = bSubject | |
bSubject.onNext("D") | |
aSubject.onNext("E") | |
//========================2.Transforming======================== | |
// map 应用一个转换闭包于通过一个Observable序列发散的元素,并且返回一个新的转换过元素的Observable序列。 | |
Observable.of(1,2,3) | |
.map { $0 * 2 } | |
.subscribe { | |
print($0) | |
} | |
//flatMap | |
/* | |
map在做转换的时候很容易出现『升维』的情况,即:转变之后,从一个序列变成了一个序列的序列。 | |
什么是『升维』?在集合中我们可以举这样一个例子,我有一个好友列表 [p1, p2, p3],那么如果要获取我好友的好友的列表,可以这样做: | |
myFriends.map { $0.getFriends() } | |
结果就成了 [[p1-1, p1-2, p1-3], [p2-1], [p3-1, p3-2]] ,这就成了好友的好友列表的列表了。这就是一个『升维』的例子。 | |
在 Swift 中,我们可以用 flatMap 过滤掉 map 之后的 nil 结果。 | |
在 Rx 中, flatMap 可以把一个序列转换成一组序列,然后再把这一组序列『拍扁』成一个序列。 */ | |
Observable.of(0,1,2) | |
.map { (x) -> Observable<String> in | |
return Observable.just(String(x)) | |
}.subscribe { | |
print($0) | |
} | |
Observable.of(0,1,2) | |
.flatMap { (x) -> Observable<String> in | |
return Observable.just(String(x)) | |
}.subscribe { | |
print($0) | |
} | |
//flatMapLetest flatMap和flatMapLatest不同的是,flatMapLatest只会发出最新的内部Observable序列的元素。 | |
struct Player { | |
var score: Variable<Int> | |
} | |
let popeye = Player(score: Variable(80)) | |
let oyl = Player(score: Variable(90)) | |
let player = Variable(popeye) | |
player.asObservable() | |
//map -> RxSwift.BehaviorSubject<Swift.Int> | |
//flatMap -> Int | |
//flatMapLetest | |
.flatMapLatest { $0.score.asObservable() } | |
.subscribe { print ($0) } | |
/* flatMapLatest实际上是map和switchLatest操作的组合。 | |
player.asObservable() | |
.map { $0.score.asObservable() } | |
.switchLatest() | |
.subscribe { print ($0) } */ | |
popeye.score.value = 85 | |
//这里将player赋值给了oyl, 如果是 flatMapLetest, 那么popeye后续发送的事件都会被舍弃掉.并接收oyl(最近)的事件 | |
player.value = oyl | |
popeye.score.value = 91 | |
oyl.score.value = 92 | |
//scan 有点像 reduce, 它会把每次的运算结果都累积起来,作为下一次运输的输入值 | |
let numbers = [1,2,3,4,5] | |
print(numbers.reduce(0) { $0 + $1 }) | |
Observable.from(numbers) | |
.scan(0) { $0 + $1} | |
.subscribe { print($0) } | |
//========================3.Filtering & Conditional Operators======================== | |
//有选择地从源Observable序列发散元素的操作。 | |
//filter 只会让符合条件的元素通过 | |
//只订阅偶数 | |
Observable.of(1,2,3,4,5) | |
.filter { $0 % 2 == 0} | |
.subscribe{ print($0) } | |
//distinctUntilChanged 会废弃掉重复的事件 | |
Observable.of(1,2,2,3,3,4,4,5) | |
.distinctUntilChanged() | |
.subscribe{ print($0) } | |
//elementAt 发散一个源Observable序列中指定索引的元素 | |
Observable.of(1,2,3) | |
.elementAt(2) | |
.subscribe{ print($0) } | |
//single 发出一个源Observable序列中的第一个元素(或者满足条件的第一个元素)。 | |
//如果这个源Observable序列不能发出一个元素或者根据条件查找到两个以上的元素将会抛出一个错误。 | |
Observable.of(1) | |
.single() | |
.subscribe{ print($0) } | |
Observable.of(1) | |
.single{ $0 > 2 } //这里没有 > 2 的元素, 将会抛出一个错误error(Sequence doesn't contain any elements.) | |
.subscribe{ print($0) } | |
Observable.of(1,2,3,4) //这里 > 2 的元素有 3,4 将会抛出一个错误error(Sequence contains more than one element.) | |
.single { $0 > 2} | |
.subscribe{ print($0) } | |
//take 从一个源Observable序列的开始元素发散指定数量的元素 | |
Observable.of(1,2,3,4) | |
.take(2) | |
.subscribe{ print($0) } | |
//takeLast 从一个源Observable序列的结尾元素发散指定数量的元素。 | |
Observable.of(1,2,3,4) | |
.takeLast(2) | |
.subscribe{ print($0) } | |
//takeWhile 发出一个源Observable序列中的只要指定条件的值为true的元素 | |
Observable.of(1,2,3,4) | |
.takeWhile { $0 != 3 } // 当$0 == 3 时,后续的事件就会被舍弃掉 | |
.subscribe{ print($0) } | |
//takeUntil 其实就是 take ,它会在终于等到那个事件之后触发 .Completed 事件 | |
let refObservable = PublishSubject<String>() | |
let sourceObservable = PublishSubject<String>() | |
sourceObservable | |
.takeUntil(refObservable) | |
.subscribe{ print($0) } | |
sourceObservable.onNext("A") | |
sourceObservable.onNext("B") | |
refObservable.onNext("终于等到你") | |
sourceObservable.onNext("C") //不在执行 | |
//skip 跳过指定数量的元素 | |
Observable.of(1,2,3) | |
.skip(2) | |
.subscribe{ print($0) } | |
//skipWhile 跳过满足条件的元素 | |
Observable.of(1,2,3) | |
.skipWhile { $0 < 2} | |
.subscribe{ print($0) } | |
//skipWhileWithIndex 跳过满足条件的索引的元素。这个闭包查询了每个元素 | |
Observable.of(1,2,3) | |
.skipWhileWithIndex({ (element, index) -> Bool in | |
index < 1 | |
}) | |
.subscribe{ print($0) } | |
//skipUntil 其实就是 skip ,它会在终于等到那个事件之后 才会响应事件 | |
//========================4.Mathematical and Aggregate Operators======================== | |
//数学和集合操作 | |
//contact 多个事件序列合并起来 | |
let v1 = BehaviorSubject(value: 100) | |
let v2 = BehaviorSubject(value: 200) | |
v1 | |
.concat(v2) | |
.subscribe { print($0) } | |
v1.onNext(101) | |
v1.onCompleted() | |
v2.onNext(201) | |
v1.onNext(102) | |
// reduce 这里的 reduce 和 CollectionType 中的 reduce 是一个意思,都是指通过对一系列数据的运算最后生成一个结果 | |
Observable.of(1,2,3) | |
.reduce(0) { $0 + $1 } | |
.subscribe { print($0) } | |
//========================5.Connectable Operators======================== | |
//可连接的操作 | |
//========================6.Error Handling Operators======================== | |
//错误处理操作 | |
enum BusinessError : Error { | |
case test | |
} | |
// catchErrorJustReturn onError时,用一个默认值替代,并发送 .onComplete | |
let subj1 = PublishSubject<String>() | |
subj1 | |
.catchErrorJustReturn("Jesus!") | |
.subscribe { print($0) } | |
subj1.onNext("Hello") | |
subj1.onNext("Swift") | |
subj1.onError(BusinessError.test) | |
//catchError 可以捕获异常事件,并且在后面无缝接上另一段事件序列,丝毫没有异常的痕迹 | |
let subj2 = PublishSubject<String>() | |
let subj3 = Observable.of("XX","ZZ") | |
subj2 | |
.catchError { (error) -> Observable<String> in | |
return subj3 | |
} | |
.subscribe { print($0) } | |
subj2.onNext("NO ERROR") | |
subj2.onError(BusinessError.test) | |
//retry 顾名思义,就是在出现异常的时候会再去从头订阅事件序列,妄图通过『从头再来』解决异常 | |
var count = 1 | |
let sequenceError = Observable<String>.create { (observer) -> Disposable in | |
observer.onNext("A") | |
observer.onNext("B") | |
if count == 1 { | |
observer.onError(BusinessError.test) | |
print("retry ...............") | |
count += 1 | |
} | |
observer.onNext("C") | |
observer.onNext("D") | |
return Disposables.create() | |
} | |
sequenceError | |
.retry() //.retry(3) 最多尝试3次 | |
.subscribe { print($0) } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment