前置き
前回の記事からの続きです。
アーキテクチャーに Combine/RxSwift などのリアクティブプログラミングを用いていると、後続のModelやRepositoryなどでも、接続部分をリアクティブプログラミングで書くことがあるでしょう。(連結が楽でコードが綺麗になるため)
しかし、Concurrencyの登場によって、これが変わろうとしています。
今回は、一部の処理をConcurrencyに置き換えた際に、CombineからConcurrencyを呼ぶ必要が出てきたため、これを綺麗に書けないかと模索してみました。
初めに
前回の実装では Task
がそのままだったため、処理をキャンセルできない状態になっていました。
今回はキャンセル処理を実装したので、新たに記事を書きました。
(前回の記事が長いので、分割する方が良いと考えた)
また、Combine
のコミュニティで意見を求めたところ、TaskPriority
が設定できた方が良いという意見も出てきたので、今回はそれも反映させています。
実装
複数の拡張した実装があるため、今回は以下のコードを例に実装を見ていきましょう。
func asyncSink(
priority: TaskPriority? = nil,
receiveValue: @escaping ((Self.Output) async -> Void)
) -> AnyCancellable {
self.sink { value in
Task(priority: priority) {
await receiveValue(value)
}
}
}
こちらのコードは前回の記事にも掲載されているものになります。
(TaskPriority
のみ新規追加)
このコードでは、Task
が実行されたままになってしまうので、これをキャンセルできるようにします。
1. AnyCancellable の拡張
Task
をキャンセルするには、インスタンスを保持して、他からキャンセルの処理を実施する必要があります。既存のコードに合わせると、今のようなイメージです。
func asyncSink(
receiveValue: @escaping ((Self.Output) async -> Void)
) -> AnyCancellable {
+ var task: Task<Void, Never>?
return self.sink { value in
+ task = Task(priority: priority) {
await receiveValue(value)
}
}
/* この辺で`Task`をキャンセルできる処理を入れたい */
}
このままではキャンセル処理が実行できないため、sink
で返される AnyCancellable
を拡張することで、実行できるようにします。
extension AnyCancellable {
func cancel(completion: @escaping () -> Void) -> AnyCancellable {
.init { completion() }
}
}
(もっと良いアプローチがあればコメントいただけますとmm)
これで追加でキャンセルの処理を呼びことができます。
2. 拡張した処理の追加
拡張したコードを実装します。
func asyncSink(
priority: TaskPriority? = nil,
receiveValue: @escaping ((Self.Output) async -> Void)
) -> AnyCancellable {
var task: Task<Void, Never>?
return self.sink { value in
task = Task(priority: priority) {
+ try? Task.checkCancellation()
await receiveValue(value)
}
+ }.cancel {
+ task?.cancel()
+ }
}
これで Task
をキャンセルできるようになりました。
また、解放されるように checkCancellation
を追加しています。
これで完了かと思いきや問題がありました。
コードをテストしていて動作しない場合があることに気づきました。
以下に例を出していきます。
- 動作するパターン
Just
で動作を確認した場合は以下のようになります。
let exp = expectation(description: "wait for asyncSink")
var cancellable: AnyCancellable = Just<Int>(0)
.asyncSink { [weak self] number in
print("isCancelled: ", Task.isCancelled) // ・・・① false になる
await Task.sleep(2 * NSEC_PER_SEC) // ・・・② `cancel()` を確認するために2秒待つ
print("isCancelled: ", Task.isCancelled) // ・・・③ true になる
exp.fulfill()
}
cancellable.cancel() // ・・・④ 処理を即時キャンセルする
wait(for: [exp], timeout: 3.0)
処理の時系列はこのようになります。
- 実行時
①
はキャンセル前なのでTask.isCancelled
はfalse
になっている -
②
で2秒待っている間に、④
のキャンセル処理が実行される - その結果、
③
ではTask.isCancelled
はtrue
になっている
つまり、ちゃんとキャンセル処理がなされていることが確認できます。
- 動作しないパターン
上記のテストを PassthroughSubject
で確認した場合はうまく動作しませんでした。
根本的に asyncSink
が呼ばれていないことに気づいたため、以下のようなデバッグを行いました。具体的には handleEvents
で動作を追って見ました。
let subject = PassthroughSubject<Void, Never>()
subject
.handleEvents(receiveSubscription: {
debugPrint("----receiveSubscription----", $0)
}, receiveOutput: {
debugPrint("----receiveOutput----", $0)
}, receiveCompletion: {
debugPrint("----receiveCompletion----", $0)
}, receiveCancel: {
debugPrint("----receiveCancel----")
}, receiveRequest: {
debugPrint("----receiveRequest----", $0)
})
.asyncSink {
debugPrint("----asyncSink----", $0)
}
.store(in: &cancellable)
subject.send(())
ログを以下のようになっていました。
"----receiveSubscription----" PassthroughSubject
"----receiveRequest----" unlimited
"----receiveCancel----"
どうやら、内部的に勝手にキャンセルされていそうということがわかりました。
また、原因は以下のようにインスタンスを保持していないことだということもわかりました。
そこで、「2. 拡張した処理の追加」
で作成したコードを改良していきます。
3. AnyCancellable の破棄を防ぐ
AnyCancellable
を保持できる仕組みにすることで、破棄されないようにします。
func asyncSink(
priority: TaskPriority? = nil,
receiveValue: @escaping ((Self.Output) async -> Void)
- ) -> AnyCancellable {
+ ) -> Set<AnyCancellable> {
// `AnyCancellable` を保持できるようにする
+ var set = Set<AnyCancellable>()
var task: Task<Void, Never>?
- return self.sink { value in
+ let cancellable = self.sink { value in
task = Task(priority: priority) {
try? Task.checkCancellation()
await receiveValue(value)
}
}
// `AnyCancellable` を保持する
+ cancellable
+ .store(in: &set)
// 別途、キャンセル処理を実装し、同じく保持する
+ cancellable
+ .cancel { task?.cancel() }
+ .store(in: &set)
// 保持した `AnyCancellable` を返す
return set
}
ポイントとしては、sink
作られた AnyCancellable
を保持することです。.cancel
は別途作成して別々にします。
また、今まで返り値は単体の AnyCancellable
だったのに対し、複数である Set<AnyCancellable>
になっているので、store
が複数でも行えるように拡張すると、実際の運用でも書きやすいです。
(別に拡張する必要はないが、ボイラーテンプレートになるので)
extension Sequence where Element == AnyCancellable {
func store<C>(
in collection: inout C
) where C: RangeReplaceableCollection, C.Element == Element {
forEach { $0.store(in: &collection) }
}
func store(in set: inout Set<Element>) {
forEach { $0.store(in: &set) }
}
func cancel() {
forEach { $0.cancel() }
}
}
このコードを PassthroughSubject
で動作を確認した場合は以下のようになります。
let exp = expectation(description: "wait for asyncSink")
let subject = PassthroughSubject<Void, Never>()
var cancellables = Set<AnyCancellable>()
subject.asyncSink(priority: .background) {
XCTAssertFalse(Task.isCancelled) // false を出力
try? await Task.sleep(nanoseconds: 2 * NSEC_PER_SEC) // キャンセル処理を2秒待つ
XCTAssertTrue(Task.isCancelled) // true を出力
exp.fulfill()
}.store(in: &cancellables)
subject.send(()) // 処理の実行
sleep(1) // キャンセル処理の実行を1秒だけ遅らせる
cancellables.cancel() // キャンセルの実行
wait(for: [exp], timeout: 3.0)
今度はちゃんとキャンセル処理がなされていることが確認できました。
今回、紹介したものをあくまでも一例ですが、このようにキャンセルの処理を実装することができました。
その他
他の拡張も合わせてライブラリ実装にまとめてあるので、中のコードを参考にしていただければ幸いですmm
PR や Issue もお待ちしています!
終わりに
実は Combine
コミュニティで提案された AsyncStream
を使ったキャンセル処理もあるのですが、コードがあまりにも多いため、個人的にはもう少しライトに実装できるものがよいと考えて採用を見送りましたmm
AsyncStream
でながながと処理の追えないコードを実装するくらいなら、前回の記事でも紹介した CombineExt にある create
コードをコピーする方が良いと思ったので、そこまでするほどでもないと考えたためです。
まだ、改善の余地はあると思うので、コメントなどあれば PR いただけますとmm