0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Combine の receive(on: ) の挙動が怪しい

Last updated at Posted at 2023-03-11

はじめに

最近、私が携わっているアプリを RxSwift から Combine へ置き換える機会がありました。そこで挙動を調べていたのですが、 receive(on: ) を行っていると sink でイベントが流れてこなくなる事がありました。

使用環境

  • Xcode 14.2
  • iOS 16.2
  • macOS 13.2.1

発生した現象

ViewController.swift
import Combine
import UIKit

class ViewController: UIViewController {
    private var cancellables: Set<AnyCancellable> = []

    override func viewDidLoad() {
        super.viewDidLoad()
        
        Just("output")
            .setFailureType(to: Error.self)
            .receive(on: DispatchQueue.global())
            .sink(
                receiveCompletion: { completion in
                    switch completion {
                    case .finished:
                        print("finished")
                    case .failure:
                        print("failure")
                    }
                },
                receiveValue: { output in
                    print(output)
                }
            )
            .store(in: &self.cancellables)
    }
}

これを実行すると、ほとんど下記のように出力されます。

output
finished

が、極稀に ( 体感 100 回に 1 回くらい ) で下記のように出力されました。

finished

receiveValue の処理を通らずに、 .finished が流れてくるような現象となっていたため、調べていくことにしました。

調べた結果

receive(on: ) の中身を DispatchQueue.main にする

Just("output")
    .setFailureType(to: Error.self)
-    .receive(on: DispatchQueue.global())
+    .receive(on: DispatchQueue.main)
    .sink(
        receiveCompletion: { completion in
            switch completion {
            case .finished:
                print("finished")
            case .failure:
                print("failure")
            }
        },
        receiveValue: { output in
            print(output)
        }
    )
    .store(in: &self.cancellables)

→ 問題発生しない

receive → subscribe にする ( Scheduler は DispatchQueue.global() のまま )

Just("output")
    .setFailureType(to: Error.self)
-    .receive(on: DispatchQueue.global())
+    .subscribe(on: DispatchQueue.global())
    .sink(
        receiveCompletion: { completion in
            switch completion {
            case .finished:
                print("finished")
            case .failure:
                print("failure")
            }
        },
        receiveValue: { output in
            print(output)
        }
    )
    .store(in: &self.cancellables)

→ 問題発生しない

subscribe(on: ) の中身を DispatchQueue.main にする

Just("output")
    .setFailureType(to: Error.self)
-    .receive(on: DispatchQueue.global())
+    .subscribe(on: DispatchQueue.main)
    .sink(
        receiveCompletion: { completion in
            switch completion {
            case .finished:
                print("finished")
            case .failure:
                print("failure")
            }
        },
        receiveValue: { output in
            print(output)
        }
    )
    .store(in: &self.cancellables)

→ 問題発生しない

subscribe(on: ) を加える

Just("output")
    .setFailureType(to: Error.self)
+    .subscribe(on: DispatchQueue.global())
    .receive(on: DispatchQueue.global())
    .sink(
        receiveCompletion: { completion in
            switch completion {
            case .finished:
                print("finished")
            case .failure:
                print("failure")
            }
        },
        receiveValue: { output in
            print(output)
        }
    )
    .store(in: &self.cancellables)
Just("output")
    .setFailureType(to: Error.self)
+    .subscribe(on: DispatchQueue.main)
    .receive(on: DispatchQueue.global())
    .sink(
        receiveCompletion: { completion in
            switch completion {
            case .finished:
                print("finished")
            case .failure:
                print("failure")
            }
        },
        receiveValue: { output in
            print(output)
        }
    )
    .store(in: &self.cancellables)

→ 問題発生する

結論

調査結果から、 receive(on: ) で DispatchQueue.global() を指定している場合にイベントが発火しないことがあり得るようでした。

ですので実装としては

outputを
メインスレッドで処理したい
outputを
サブスレッドで処理したい
イベント発火を
メインスレッドで処理したい
.subcribe(on: DispatchQueue.main) ???
イベント発火を
サブスレッドで処理したい
.subscribe(on: DispatchQueue.global())
.receive(on: DispatchQueue.main)
.subscribe(on: DispatchQueue.global())

として、右上の領域のみ対応策がわかりきれませんでした。一応右上のような状況になるケースが思いつかなかったので、あまり考える必要ないのかもしれませんが、、、

もし調査結果の間違いや現象の原因、対応方法などありましたらコメントいただけますと幸いです。

追記 ( 2023/03/12 )

コメントにてご指摘いただきました。どうやらこれは正常な動作で、 DispatchQueue.global() は concurrent queue のため output と finished の流れてくる順序が前後し、 receiveValue を処理するよりも先に購読完了してしまったということでした。
これが正常な動作な場合 receive(on: ) でサブスレッドを指定したい場合どのようにするのがいいのだろう?と改めて調べたところ、下記の Stack Overflow を見つけました。
https://stackoverflow.com/questions/66030354/sink-receivevalue-is-not-called-after-publisher-receiveon-backgroundqueue

Just("output")
    .setFailureType(to: Error.self)
-    .receive(on: DispatchQueue.global())
+    .receive(on: DispatchQueue(label: "work in subthread", qos: .background))
    .sink(
        receiveCompletion: { completion in
            switch completion {
            case .finished:
                print("finished")
            case .failure:
                print("failure")
            }
        },
        receiveValue: { output in
            print(output)
        }
    )
    .store(in: &self.cancellables)

→ 問題発生しない

concurrent ではなく serial の Dispatch Queue を作成することで、 サブスレッドでも順番を維持したまま処理を行えるということでした。私が普段からほぼ DispatchQueue.main か DispatchQueue.global() しか使用しておらず考えが及ばなかったため、今回の調査を行うことになってしまいました。もう少し GCD について深く勉強します。

0
1
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?