Skip to content

Instantly share code, notes, and snippets.

@susieyy
Last active November 17, 2020 14:20
Show Gist options
  • Save susieyy/95e8f55fd077162c7e02c13b541e8309 to your computer and use it in GitHub Desktop.
Save susieyy/95e8f55fd077162c7e02c13b541e8309 to your computer and use it in GitHub Desktop.
Use Future to work around an issue where the Combine's MergeMany operator may not perform background processing.
import Foundation
import Combine
// see: not working https://gist.github.com/susieyy/3e1059d5dc9d0f2278db07e3fb73332f
// Use Future to work around an issue where the Combine's MergeMany operator may not perform background processing.
// $ swift --version
// Apple Swift version 5.3.1 (swiftlang-1200.0.41 clang-1200.0.32.8)
// Target: x86_64-apple-darwin20.1.0
// $ xcodebuild -version
// Xcode 12.2
// Build version 12B5044c
let workers: [AnyPublisher<String, Never>] = (0..<20).map {
Just<Int>($0)
.flatMap { value in
Future<String, Never>() { promise in
DispatchQueue.global().async {
print("Working: isMainThread [\(Thread.isMainThread)] [\(value)]")
promise(.success(String(value)))
}
}
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
var cancellables: Set<AnyCancellable> = .init()
print("Workers initial count: \(workers.count)")
Publishers
.MergeMany(workers)
.collect()
.receive(on: DispatchQueue.main)
.sink(receiveValue: {
print("Workers result count: \($0.count)")
dump($0)
})
.store(in: &cancellables)
// - OUTPUT --------------------------------------------------------
// Workers initial count: 20
// Working: isMainThread [false] [0]
// Working: isMainThread [false] [1]
// Working: isMainThread [false] [2]
// Working: isMainThread [false] [3]
// Working: isMainThread [false] [4]
// Working: isMainThread [false] [5]
// Working: isMainThread [false] [6]
// Working: isMainThread [false] [7]
// Working: isMainThread [false] [8]
// Working: isMainThread [false] [9]
// Working: isMainThread [false] [11]
// Working: isMainThread [false] [12]
// Working: isMainThread [false] [10]
// Working: isMainThread [false] [13]
// Working: isMainThread [false] [14]
// Working: isMainThread [false] [15]
// Working: isMainThread [false] [16]
// Working: isMainThread [false] [17]
// Working: isMainThread [false] [18]
// Working: isMainThread [false] [19]
// Workers result count: 20
// ▿ 20 elements
// - "0"
// - "1"
// - "2"
// - "3"
// - "4"
// - "5"
// - "7"
// - "6"
// - "8"
// - "9"
// - "11"
// - "12"
// - "10"
// - "13"
// - "14"
// - "15"
// - "16"
// - "17"
// - "18"
// - "19"
// - OUTPUT --------------------------------------------------------
@susieyy
Copy link
Author

susieyy commented Nov 17, 2020

なるほど、@yimajo さん解説は、仮説ですが論理的に整合性が通っており、とても腑に落ちました。

Hot化されてしまうからsubscribe(on:)でバックグラウンドキューによる指定ができなくなるという状況

確かに、UpstreamがHotだと、Downstreamでsubscribe(on:)によるUpstreamのスケジュラーを変更はできなそうですね。

Just<Map< ...>>型のストリームをSubscriberがsubscribeした際にいわゆるHot化してくれようとしている

&

eraseToAnyPublisherでColdのまま動作

このあたりの挙動はドキュメントに明記もないですし、宣言的な記述にもかかわらず、書き手&読み手が挙動を直感的に類推しにくいですね。(悩ましい)

そして実際はアプリの開発にJustとmapをやりたいわけじゃなく、本当はMergeManyとcollectの検証のために今回のようにJustとmap書いただけではないかと思いますので、結局FutureにしてDeferred使うことに落ち着くかなという気もします。

手元のアプリはあまり他のアプリではないですが、200個ぐらいMergeManyで並列処理しています。
各並列処理をFutureで記述すると、200個のFutureを逐次インスタンス化しいるところからHotのため随時実行開始されます。
そうすると、読み手がFutureはHotであることを知っていても、コードを読んで上記の挙動を直感的にイメージしにくいなぁと思っています。
なので、MergeManyで並列処理を開始して、collectで全件待ち合わせる挙動の方が、記述から意図(挙動)を読み取りやすいかなと感じています。

Futureでお茶を濁そうかと思っていたので、@yimajo さんの subscribe(on:) の活用はとても助かりました。( JustはHotになりますが、mapがColdのままのでありがたい )
Deferredも意図する挙動に活用できそうですね。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment