-
-
Save susieyy/95e8f55fd077162c7e02c13b541e8309 to your computer and use it in GitHub Desktop.
import Foundation | |
import Combine | |
// see: not working https://gist.github.com/susieyy/3e1059d5dc9d0f2278db07e3fb73332f | |
// Use Future to work around an issue where the Combine's MergeMany operator may not perform background processing. | |
// $ swift --version | |
// Apple Swift version 5.3.1 (swiftlang-1200.0.41 clang-1200.0.32.8) | |
// Target: x86_64-apple-darwin20.1.0 | |
// $ xcodebuild -version | |
// Xcode 12.2 | |
// Build version 12B5044c | |
let workers: [AnyPublisher<String, Never>] = (0..<20).map { | |
Just<Int>($0) | |
.flatMap { value in | |
Future<String, Never>() { promise in | |
DispatchQueue.global().async { | |
print("Working: isMainThread [\(Thread.isMainThread)] [\(value)]") | |
promise(.success(String(value))) | |
} | |
} | |
.eraseToAnyPublisher() | |
} | |
.eraseToAnyPublisher() | |
} | |
var cancellables: Set<AnyCancellable> = .init() | |
print("Workers initial count: \(workers.count)") | |
Publishers | |
.MergeMany(workers) | |
.collect() | |
.receive(on: DispatchQueue.main) | |
.sink(receiveValue: { | |
print("Workers result count: \($0.count)") | |
dump($0) | |
}) | |
.store(in: &cancellables) | |
// - OUTPUT -------------------------------------------------------- | |
// Workers initial count: 20 | |
// Working: isMainThread [false] [0] | |
// Working: isMainThread [false] [1] | |
// Working: isMainThread [false] [2] | |
// Working: isMainThread [false] [3] | |
// Working: isMainThread [false] [4] | |
// Working: isMainThread [false] [5] | |
// Working: isMainThread [false] [6] | |
// Working: isMainThread [false] [7] | |
// Working: isMainThread [false] [8] | |
// Working: isMainThread [false] [9] | |
// Working: isMainThread [false] [11] | |
// Working: isMainThread [false] [12] | |
// Working: isMainThread [false] [10] | |
// Working: isMainThread [false] [13] | |
// Working: isMainThread [false] [14] | |
// Working: isMainThread [false] [15] | |
// Working: isMainThread [false] [16] | |
// Working: isMainThread [false] [17] | |
// Working: isMainThread [false] [18] | |
// Working: isMainThread [false] [19] | |
// Workers result count: 20 | |
// ▿ 20 elements | |
// - "0" | |
// - "1" | |
// - "2" | |
// - "3" | |
// - "4" | |
// - "5" | |
// - "7" | |
// - "6" | |
// - "8" | |
// - "9" | |
// - "11" | |
// - "12" | |
// - "10" | |
// - "13" | |
// - "14" | |
// - "15" | |
// - "16" | |
// - "17" | |
// - "18" | |
// - "19" | |
// - OUTPUT -------------------------------------------------------- |
なるほど、@yimajo さん解説は、仮説ですが論理的に整合性が通っており、とても腑に落ちました。
Hot化されてしまうからsubscribe(on:)でバックグラウンドキューによる指定ができなくなるという状況
確かに、UpstreamがHotだと、Downstreamでsubscribe(on:)によるUpstreamのスケジュラーを変更はできなそうですね。
Just<Map< ...>>型のストリームをSubscriberがsubscribeした際にいわゆるHot化してくれようとしている
&
eraseToAnyPublisherでColdのまま動作
このあたりの挙動はドキュメントに明記もないですし、宣言的な記述にもかかわらず、書き手&読み手が挙動を直感的に類推しにくいですね。(悩ましい)
そして実際はアプリの開発にJustとmapをやりたいわけじゃなく、本当はMergeManyとcollectの検証のために今回のようにJustとmap書いただけではないかと思いますので、結局FutureにしてDeferred使うことに落ち着くかなという気もします。
手元のアプリはあまり他のアプリではないですが、200個ぐらいMergeManyで並列処理しています。
各並列処理をFutureで記述すると、200個のFutureを逐次インスタンス化しいるところからHotのため随時実行開始されます。
そうすると、読み手がFutureはHotであることを知っていても、コードを読んで上記の挙動を直感的にイメージしにくいなぁと思っています。
なので、MergeManyで並列処理を開始して、collectで全件待ち合わせる挙動の方が、記述から意図(挙動)を読み取りやすいかなと感じています。
Futureでお茶を濁そうかと思っていたので、@yimajo さんの subscribe(on:) の活用はとても助かりました。( JustはHotになりますが、mapがColdのままのでありがたい )
Deferredも意図する挙動に活用できそうですね。
なるほど!スレッドチェックしてませんでした。
はい。その通りだと思います。
さらに言うと、Justとmapの間にあることで
Just<Map< ... >>
にならないようにしているとも思います。解決案2: Justとmapを連続しない
それに関連する話で、実は最初に出した
.subscribe(on: DispatchQueue.global())
にこだわった解決案2があり、次のようにJustとmapを連続しないことでmapはバックグラウンドスレッドで動作するようになりますね。Justとmapを連続すると型がJust<Map< ...>>になり何が起こっているのか
この理由はあくまで予測なのですが、
Just<Map< ...>>
型のストリームをSubscriberがsubscribeした際にいわゆるHot化してくれようとしているんだと思います。Hot化されてしまうからsubscribe(on:)
でバックグラウンドキューによる指定ができなくなるという状況なんだと予想しています(CombineではHotという概念が表面的には示されていないのであくまで「Hot的な動作」という意味です)。なぜそう思うかというと、複数のSubscriberがsubscribeした際に結果が共有されるようになるためです。検証のためにコピペで
sink
を複数でやってみます。次のような感じです。.eraseToAnyPublisher()
があるとsubscribe(on:)
で指定したバックグラウンドキューで実行される.eraseToAnyPublisher()
がないとsubscribe(on:)
で指定したバックグラウンドキューで実行されないこれ不思議なんですが、実は
.map
だけじゃなく.filter
でも同様だったと思います。このことから憶測ですがAppleのCombineは意図的にオペレータの組み合わせでHot化させて無駄を省こうとしてくれていて、それによってsubscribe(on:)
でキューを指定してもそれに応じないんじゃないかと思っています。FutureもいわゆるHotなので、キューを指定してもそれに従わないのと同じなんだと思います。整理してみます
.receive(on:)
を置く.eraseToAnyPublisher()
を置いて、.subscribe(on:)
を置くという感じでしょうか。私の予測が入り混じっているので全然自信はないです。
そして実際はアプリの開発に
Just
とmap
をやりたいわけじゃなく、本当はMergeMany
とcollect
の検証のために今回のようにJustとmap書いただけではないかと思いますので、結局Future
にしてDeferred
使うことに落ち着くかなという気もします。print
したらmap
のreceive finished
が20回あるがreceive value:
が19回の件あくまで憶測ですが
collect()
の動作が2種類ある気がします。動作的には
.subscribe(on)
されてないColdなPublisherたちを下流にあるcollect()
が扱う場合は、何かしらのタイミングで終了したという判断をしないといけないために上流がイベントを流さずにその上流ストリームも終了させられてしまう。一方、HotなPublisherもしくは.subscribe(on:)
されたColdなPublisherたちのcollect()
は、非同期実行を待つ前提であるため上流のストリームの終了を待つという感じなのかな、と思いました。思い込みかもしれませんが...。以上、長々と憶測が多くなり失礼しましたー🙇