Last active
November 17, 2020 14:20
-
-
Save susieyy/95e8f55fd077162c7e02c13b541e8309 to your computer and use it in GitHub Desktop.
Use Future to work around an issue where the Combine's MergeMany operator may not perform background processing.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Combine | |
// see: not working https://gist.github.com/susieyy/3e1059d5dc9d0f2278db07e3fb73332f | |
// Use Future to work around an issue where the Combine's MergeMany operator may not perform background processing. | |
// $ swift --version | |
// Apple Swift version 5.3.1 (swiftlang-1200.0.41 clang-1200.0.32.8) | |
// Target: x86_64-apple-darwin20.1.0 | |
// $ xcodebuild -version | |
// Xcode 12.2 | |
// Build version 12B5044c | |
let workers: [AnyPublisher<String, Never>] = (0..<20).map { | |
Just<Int>($0) | |
.flatMap { value in | |
Future<String, Never>() { promise in | |
DispatchQueue.global().async { | |
print("Working: isMainThread [\(Thread.isMainThread)] [\(value)]") | |
promise(.success(String(value))) | |
} | |
} | |
.eraseToAnyPublisher() | |
} | |
.eraseToAnyPublisher() | |
} | |
var cancellables: Set<AnyCancellable> = .init() | |
print("Workers initial count: \(workers.count)") | |
Publishers | |
.MergeMany(workers) | |
.collect() | |
.receive(on: DispatchQueue.main) | |
.sink(receiveValue: { | |
print("Workers result count: \($0.count)") | |
dump($0) | |
}) | |
.store(in: &cancellables) | |
// - OUTPUT -------------------------------------------------------- | |
// Workers initial count: 20 | |
// Working: isMainThread [false] [0] | |
// Working: isMainThread [false] [1] | |
// Working: isMainThread [false] [2] | |
// Working: isMainThread [false] [3] | |
// Working: isMainThread [false] [4] | |
// Working: isMainThread [false] [5] | |
// Working: isMainThread [false] [6] | |
// Working: isMainThread [false] [7] | |
// Working: isMainThread [false] [8] | |
// Working: isMainThread [false] [9] | |
// Working: isMainThread [false] [11] | |
// Working: isMainThread [false] [12] | |
// Working: isMainThread [false] [10] | |
// Working: isMainThread [false] [13] | |
// Working: isMainThread [false] [14] | |
// Working: isMainThread [false] [15] | |
// Working: isMainThread [false] [16] | |
// Working: isMainThread [false] [17] | |
// Working: isMainThread [false] [18] | |
// Working: isMainThread [false] [19] | |
// Workers result count: 20 | |
// ▿ 20 elements | |
// - "0" | |
// - "1" | |
// - "2" | |
// - "3" | |
// - "4" | |
// - "5" | |
// - "7" | |
// - "6" | |
// - "8" | |
// - "9" | |
// - "11" | |
// - "12" | |
// - "10" | |
// - "13" | |
// - "14" | |
// - "15" | |
// - "16" | |
// - "17" | |
// - "18" | |
// - "19" | |
// - OUTPUT -------------------------------------------------------- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
なるほど、@yimajo さん解説は、仮説ですが論理的に整合性が通っており、とても腑に落ちました。
確かに、UpstreamがHotだと、Downstreamでsubscribe(on:)によるUpstreamのスケジュラーを変更はできなそうですね。
&
このあたりの挙動はドキュメントに明記もないですし、宣言的な記述にもかかわらず、書き手&読み手が挙動を直感的に類推しにくいですね。(悩ましい)
手元のアプリはあまり他のアプリではないですが、200個ぐらいMergeManyで並列処理しています。
各並列処理をFutureで記述すると、200個のFutureを逐次インスタンス化しいるところからHotのため随時実行開始されます。
そうすると、読み手がFutureはHotであることを知っていても、コードを読んで上記の挙動を直感的にイメージしにくいなぁと思っています。
なので、MergeManyで並列処理を開始して、collectで全件待ち合わせる挙動の方が、記述から意図(挙動)を読み取りやすいかなと感じています。
Futureでお茶を濁そうかと思っていたので、@yimajo さんの subscribe(on:) の活用はとても助かりました。( JustはHotになりますが、mapがColdのままのでありがたい )
Deferredも意図する挙動に活用できそうですね。