Skip to content

Instantly share code, notes, and snippets.

@susieyy
Last active November 17, 2020 14:20
Show Gist options
  • Save susieyy/95e8f55fd077162c7e02c13b541e8309 to your computer and use it in GitHub Desktop.
Save susieyy/95e8f55fd077162c7e02c13b541e8309 to your computer and use it in GitHub Desktop.
Use Future to work around an issue where the Combine's MergeMany operator may not perform background processing.
import Foundation
import Combine
// see: not working https://gist.github.com/susieyy/3e1059d5dc9d0f2278db07e3fb73332f
// Use Future to work around an issue where the Combine's MergeMany operator may not perform background processing.
// $ swift --version
// Apple Swift version 5.3.1 (swiftlang-1200.0.41 clang-1200.0.32.8)
// Target: x86_64-apple-darwin20.1.0
// $ xcodebuild -version
// Xcode 12.2
// Build version 12B5044c
let workers: [AnyPublisher<String, Never>] = (0..<20).map {
Just<Int>($0)
.flatMap { value in
Future<String, Never>() { promise in
DispatchQueue.global().async {
print("Working: isMainThread [\(Thread.isMainThread)] [\(value)]")
promise(.success(String(value)))
}
}
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
var cancellables: Set<AnyCancellable> = .init()
print("Workers initial count: \(workers.count)")
Publishers
.MergeMany(workers)
.collect()
.receive(on: DispatchQueue.main)
.sink(receiveValue: {
print("Workers result count: \($0.count)")
dump($0)
})
.store(in: &cancellables)
// - OUTPUT --------------------------------------------------------
// Workers initial count: 20
// Working: isMainThread [false] [0]
// Working: isMainThread [false] [1]
// Working: isMainThread [false] [2]
// Working: isMainThread [false] [3]
// Working: isMainThread [false] [4]
// Working: isMainThread [false] [5]
// Working: isMainThread [false] [6]
// Working: isMainThread [false] [7]
// Working: isMainThread [false] [8]
// Working: isMainThread [false] [9]
// Working: isMainThread [false] [11]
// Working: isMainThread [false] [12]
// Working: isMainThread [false] [10]
// Working: isMainThread [false] [13]
// Working: isMainThread [false] [14]
// Working: isMainThread [false] [15]
// Working: isMainThread [false] [16]
// Working: isMainThread [false] [17]
// Working: isMainThread [false] [18]
// Working: isMainThread [false] [19]
// Workers result count: 20
// ▿ 20 elements
// - "0"
// - "1"
// - "2"
// - "3"
// - "4"
// - "5"
// - "7"
// - "6"
// - "8"
// - "9"
// - "11"
// - "12"
// - "10"
// - "13"
// - "14"
// - "15"
// - "16"
// - "17"
// - "18"
// - "19"
// - OUTPUT --------------------------------------------------------
@yimajo
Copy link

yimajo commented Nov 16, 2020

なるほど!スレッドチェックしてませんでした。

これはJustがバックグラウンドスレッドで実施されて、続くmapもバックグラウンドスレッドで実施されるという解釈なのかな。

はい。その通りだと思います。

さらに言うと、Justとmapの間にあることでJust<Map< ... >>にならないようにしているとも思います。

解決案2: Justとmapを連続しない

それに関連する話で、実は最初に出した .subscribe(on: DispatchQueue.global())にこだわった解決案2があり、次のようにJustとmapを連続しないことでmapはバックグラウンドスレッドで動作するようになりますね。

let workers: [AnyPublisher<String, Never>] = (0..<20).map {
    Just<Int>($0)
        .eraseToAnyPublisher() // ここでJust<Map< ...> >になるのを防ぎAnyPublisherにしておく
        .map {
            print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
            return String($0)
        }
        .subscribe(on: DispatchQueue.global())
        .eraseToAnyPublisher()
}

Justとmapを連続すると型がJust<Map< ...>>になり何が起こっているのか

この理由はあくまで予測なのですが、Just<Map< ...>>型のストリームをSubscriberがsubscribeした際にいわゆるHot化してくれようとしているんだと思います。Hot化されてしまうからsubscribe(on:)でバックグラウンドキューによる指定ができなくなるという状況なんだと予想しています(CombineではHotという概念が表面的には示されていないのであくまで「Hot的な動作」という意味です)。

なぜそう思うかというと、複数のSubscriberがsubscribeした際に結果が共有されるようになるためです。検証のためにコピペでsinkを複数でやってみます。次のような感じです。

let workers: [AnyPublisher<String, Never>] = (0..<20).map {
    Just<Int>($0)
        .eraseToAnyPublisher() // この部分があるかないかでmapの動作回数も変わります
        .map {
            print("Working: isMainThread [\(Thread.isMainThread)] [\($0)]")
            return String($0)
        }
        .subscribe(on: DispatchQueue.global())
        .eraseToAnyPublisher()
}

var cancellables: Set<AnyCancellable> = .init()

Publishers
    .MergeMany(workers)
    .collect()
    .receive(on: DispatchQueue.main)
    .sink(receiveValue: {
        print("Workers result count: \($0.count)")
        dump($0)
    })
    .store(in: &cancellables)

Publishers
    .MergeMany(workers)
    .collect()
    .receive(on: DispatchQueue.main)
    .sink(receiveValue: {
        print("Workers result count: \($0.count)")
        dump($0)
    })
    .store(in: &cancellables)
  • .eraseToAnyPublisher()があると
    • map処理が40回になる
      • subscribeされる数を2倍にしたので
    • subscribe(on:)で指定したバックグラウンドキューで実行される
  • .eraseToAnyPublisher()がないと
    • map処理は20回以下になる
      • subscribeされた数を気にしなくなる
    • subscribe(on:)で指定したバックグラウンドキューで実行されない

これ不思議なんですが、実は.mapだけじゃなく.filterでも同様だったと思います。このことから憶測ですがAppleのCombineは意図的にオペレータの組み合わせでHot化させて無駄を省こうとしてくれていて、それによってsubscribe(on:)でキューを指定してもそれに応じないんじゃないかと思っています。FutureもいわゆるHotなので、キューを指定してもそれに従わないのと同じなんだと思います。

整理してみます

  • Cold動作で指定したバックグラウンドキューで動作させるなら
    • Justとmapの間に.receive(on:)を置く
    • Justとmapの間に.eraseToAnyPublisher()を置いて、.subscribe(on:)を置く
  • Hot動作 が望ましいなら
    • Futureにする(結局そうなるとDispatchQueue.global().async)

という感じでしょうか。私の予測が入り混じっているので全然自信はないです。

そして実際はアプリの開発にJustmapをやりたいわけじゃなく、本当はMergeManycollectの検証のために今回のようにJustとmap書いただけではないかと思いますので、結局FutureにしてDeferred使うことに落ち着くかなという気もします。

printしたらmapreceive finishedが20回あるがreceive value:が19回の件

あくまで憶測ですがcollect()の動作が2種類ある気がします。

動作的には .subscribe(on)されてないColdなPublisherたちを下流にあるcollect()が扱う場合は、何かしらのタイミングで終了したという判断をしないといけないために上流がイベントを流さずにその上流ストリームも終了させられてしまう。一方、HotなPublisherもしくは.subscribe(on:)されたColdなPublisherたちのcollect()は、非同期実行を待つ前提であるため上流のストリームの終了を待つという感じなのかな、と思いました。思い込みかもしれませんが...。

以上、長々と憶測が多くなり失礼しましたー🙇

@susieyy
Copy link
Author

susieyy commented Nov 17, 2020

なるほど、@yimajo さん解説は、仮説ですが論理的に整合性が通っており、とても腑に落ちました。

Hot化されてしまうからsubscribe(on:)でバックグラウンドキューによる指定ができなくなるという状況

確かに、UpstreamがHotだと、Downstreamでsubscribe(on:)によるUpstreamのスケジュラーを変更はできなそうですね。

Just<Map< ...>>型のストリームをSubscriberがsubscribeした際にいわゆるHot化してくれようとしている

&

eraseToAnyPublisherでColdのまま動作

このあたりの挙動はドキュメントに明記もないですし、宣言的な記述にもかかわらず、書き手&読み手が挙動を直感的に類推しにくいですね。(悩ましい)

そして実際はアプリの開発にJustとmapをやりたいわけじゃなく、本当はMergeManyとcollectの検証のために今回のようにJustとmap書いただけではないかと思いますので、結局FutureにしてDeferred使うことに落ち着くかなという気もします。

手元のアプリはあまり他のアプリではないですが、200個ぐらいMergeManyで並列処理しています。
各並列処理をFutureで記述すると、200個のFutureを逐次インスタンス化しいるところからHotのため随時実行開始されます。
そうすると、読み手がFutureはHotであることを知っていても、コードを読んで上記の挙動を直感的にイメージしにくいなぁと思っています。
なので、MergeManyで並列処理を開始して、collectで全件待ち合わせる挙動の方が、記述から意図(挙動)を読み取りやすいかなと感じています。

Futureでお茶を濁そうかと思っていたので、@yimajo さんの subscribe(on:) の活用はとても助かりました。( JustはHotになりますが、mapがColdのままのでありがたい )
Deferredも意図する挙動に活用できそうですね。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment