Skip to content

Instantly share code, notes, and snippets.

@r-plus
Last active August 13, 2017 07:32
Show Gist options
  • Save r-plus/b97dee25ea66765b97d65c792434433b to your computer and use it in GitHub Desktop.
Save r-plus/b97dee25ea66765b97d65c792434433b to your computer and use it in GitHub Desktop.
SequentiallyAll processing for RxSwift migrate from Promise.

SequentiallyAll

Promise

allだとすべてのPromiseが同時に実行状態になり、100程のPromiseをallしたい場合リソースを大量食いするので並列度を抑えて逐次的に処理しつつ全部が終わったら次の処理をしたい。
SwiftTaskではallに渡すTaskを全てpause状態で作成しておいてon(success)でまだ実行されていないTaskをresumeして実現
Hydraでは同じ実装をPRして入っている https://github.com/malcommac/Hydra/blob/master/Sources/Hydra/Promise%2BAll.swift#L47

RxSwift

zipかcombineLatestで渡した全てのObservableのイベントが揃うまで待てる(Promise的all)
どちらにしても渡した全Observableはsubscribeされてしまうので、Observable.createのclosureはその段階で実行されてしまう。
OperationQueueを利用するスケジューラであるOperationQueueSchedulerを作成しzipのあとにsubscribeOnしてみたが全Observableが一気にsubscribeされてしまう事に変わりはなかった。

なのでcreateの中でスケジューラでのスケジュール実行させる事で逐次処理できた。

NOTE: Observable.createしたObservableをretryするとリトライできるがsubscribeは全部一気にされるのでキューには既に100まで埋まっている。onErrorしたObservableはsubscribeされなおすのでキューの最後になる点に注意。

let operationQueue = OperationQueue()
operationQueue.maxConcurrentOperationCount = 2
let scheduler = OperationQueueScheduler(operationQueue: operationQueue)
var a = [Observable<Int>]()
for i in 0...100 {
a.append(
Observable<Int>.create { obs -> Disposable in
let boolDisp = BooleanDisposable()
let disposable = scheduler.schedule(()) { what in
print("start download \(i): \(Date())")
if i == 15 {
let err = NSError(domain: "d", code: -1, userInfo: nil)
obs.onError(err)
return Disposables.create()
}
let idx = Int(arc4random_uniform(UInt32(4)))
sleep([1,3,6,8][idx])
print("end download \(i): \(Date())")
guard !boolDisp.isDisposed else {
return Disposables.create()
}
obs.on(.next(i))
obs.onCompleted()
return Disposables.create()
}
return CompositeDisposable(disposable, boolDisp)
}.debug()
)
}
Observable.zip(a)
.subscribe(onNext: { (result) in
print(result)
}, onError: { (err) in
print(err)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment