はじめに
最近、私が携わっているアプリを RxSwift から Combine へ置き換える機会がありました。そこで挙動を調べていたのですが、 receive(on: ) を行っていると sink でイベントが流れてこなくなる事がありました。
使用環境
- Xcode 14.2
- iOS 16.2
- macOS 13.2.1
発生した現象
import Combine
import UIKit
class ViewController: UIViewController {
private var cancellables: Set<AnyCancellable> = []
override func viewDidLoad() {
super.viewDidLoad()
Just("output")
.setFailureType(to: Error.self)
.receive(on: DispatchQueue.global())
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
print("finished")
case .failure:
print("failure")
}
},
receiveValue: { output in
print(output)
}
)
.store(in: &self.cancellables)
}
}
これを実行すると、ほとんど下記のように出力されます。
output
finished
が、極稀に ( 体感 100 回に 1 回くらい ) で下記のように出力されました。
finished
receiveValue の処理を通らずに、 .finished が流れてくるような現象となっていたため、調べていくことにしました。
調べた結果
receive(on: ) の中身を DispatchQueue.main にする
Just("output")
.setFailureType(to: Error.self)
- .receive(on: DispatchQueue.global())
+ .receive(on: DispatchQueue.main)
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
print("finished")
case .failure:
print("failure")
}
},
receiveValue: { output in
print(output)
}
)
.store(in: &self.cancellables)
→ 問題発生しない
receive → subscribe にする ( Scheduler は DispatchQueue.global() のまま )
Just("output")
.setFailureType(to: Error.self)
- .receive(on: DispatchQueue.global())
+ .subscribe(on: DispatchQueue.global())
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
print("finished")
case .failure:
print("failure")
}
},
receiveValue: { output in
print(output)
}
)
.store(in: &self.cancellables)
→ 問題発生しない
subscribe(on: ) の中身を DispatchQueue.main にする
Just("output")
.setFailureType(to: Error.self)
- .receive(on: DispatchQueue.global())
+ .subscribe(on: DispatchQueue.main)
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
print("finished")
case .failure:
print("failure")
}
},
receiveValue: { output in
print(output)
}
)
.store(in: &self.cancellables)
→ 問題発生しない
subscribe(on: ) を加える
Just("output")
.setFailureType(to: Error.self)
+ .subscribe(on: DispatchQueue.global())
.receive(on: DispatchQueue.global())
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
print("finished")
case .failure:
print("failure")
}
},
receiveValue: { output in
print(output)
}
)
.store(in: &self.cancellables)
Just("output")
.setFailureType(to: Error.self)
+ .subscribe(on: DispatchQueue.main)
.receive(on: DispatchQueue.global())
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
print("finished")
case .failure:
print("failure")
}
},
receiveValue: { output in
print(output)
}
)
.store(in: &self.cancellables)
→ 問題発生する
結論
調査結果から、 receive(on: ) で DispatchQueue.global() を指定している場合にイベントが発火しないことがあり得るようでした。
ですので実装としては
| outputを メインスレッドで処理したい |
outputを サブスレッドで処理したい |
|
|---|---|---|
| イベント発火を メインスレッドで処理したい |
.subcribe(on: DispatchQueue.main) | ??? |
| イベント発火を サブスレッドで処理したい |
.subscribe(on: DispatchQueue.global()) .receive(on: DispatchQueue.main) |
.subscribe(on: DispatchQueue.global()) |
として、右上の領域のみ対応策がわかりきれませんでした。一応右上のような状況になるケースが思いつかなかったので、あまり考える必要ないのかもしれませんが、、、
もし調査結果の間違いや現象の原因、対応方法などありましたらコメントいただけますと幸いです。
追記 ( 2023/03/12 )
コメントにてご指摘いただきました。どうやらこれは正常な動作で、 DispatchQueue.global() は concurrent queue のため output と finished の流れてくる順序が前後し、 receiveValue を処理するよりも先に購読完了してしまったということでした。
これが正常な動作な場合 receive(on: ) でサブスレッドを指定したい場合どのようにするのがいいのだろう?と改めて調べたところ、下記の Stack Overflow を見つけました。
https://stackoverflow.com/questions/66030354/sink-receivevalue-is-not-called-after-publisher-receiveon-backgroundqueue
Just("output")
.setFailureType(to: Error.self)
- .receive(on: DispatchQueue.global())
+ .receive(on: DispatchQueue(label: "work in subthread", qos: .background))
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
print("finished")
case .failure:
print("failure")
}
},
receiveValue: { output in
print(output)
}
)
.store(in: &self.cancellables)
→ 問題発生しない
concurrent ではなく serial の Dispatch Queue を作成することで、 サブスレッドでも順番を維持したまま処理を行えるということでした。私が普段からほぼ DispatchQueue.main か DispatchQueue.global() しか使用しておらず考えが及ばなかったため、今回の調査を行うことになってしまいました。もう少し GCD について深く勉強します。