LoginSignup
1
Organization

[Swift] Combine/RxSwift と Concurrency をシームレスに繋ぐオペレータで拡張する②

前置き

前回の記事からの続きです。

アーキテクチャーに Combine/RxSwift などのリアクティブプログラミングを用いていると、後続のModelやRepositoryなどでも、接続部分をリアクティブプログラミングで書くことがあるでしょう。(連結が楽でコードが綺麗になるため)

しかし、Concurrencyの登場によって、これが変わろうとしています。

今回は、一部の処理をConcurrencyに置き換えた際に、CombineからConcurrencyを呼ぶ必要が出てきたため、これを綺麗に書けないかと模索してみました。

初めに

前回の実装では Task がそのままだったため、処理をキャンセルできない状態になっていました。

今回はキャンセル処理を実装したので、新たに記事を書きました。
(前回の記事が長いので、分割する方が良いと考えた)

また、Combine のコミュニティで意見を求めたところ、TaskPriority が設定できた方が良いという意見も出てきたので、今回はそれも反映させています。

実装

複数の拡張した実装があるため、今回は以下のコードを例に実装を見ていきましょう。

.swift
func asyncSink(
    priority: TaskPriority? = nil,
    receiveValue: @escaping ((Self.Output) async -> Void)
) -> AnyCancellable {
    self.sink { value in
        Task(priority: priority) {
            await receiveValue(value)
        }
    }
}

こちらのコードは前回の記事にも掲載されているものになります。
TaskPriority のみ新規追加)

このコードでは、Task実行されたままになってしまうので、これをキャンセルできるようにします。

1. AnyCancellable の拡張

Task をキャンセルするには、インスタンスを保持して、他からキャンセルの処理を実施する必要があります。既存のコードに合わせると、今のようなイメージです。

func asyncSink(
    receiveValue: @escaping ((Self.Output) async -> Void)
) -> AnyCancellable {
+   var task: Task<Void, Never>?

    return self.sink { value in
+       task = Task(priority: priority) {
            await receiveValue(value)
        }
    }
    /* この辺で`Task`をキャンセルできる処理を入れたい */
}

このままではキャンセル処理が実行できないため、sink で返される AnyCancellable を拡張することで、実行できるようにします。

.swift
extension AnyCancellable {

    func cancel(completion: @escaping () -> Void) -> AnyCancellable {
        .init { completion() }
    }
}

(もっと良いアプローチがあればコメントいただけますとmm)

これで追加でキャンセルの処理を呼びことができます。

2. 拡張した処理の追加

拡張したコードを実装します。

func asyncSink(
    priority: TaskPriority? = nil,
    receiveValue: @escaping ((Self.Output) async -> Void)
) -> AnyCancellable {
    var task: Task<Void, Never>?

    return self.sink { value in
        task = Task(priority: priority) {
+           try? Task.checkCancellation()
            await receiveValue(value)
       }
+   }.cancel {
+       task?.cancel()
+   }
}

これで Task をキャンセルできるようになりました。
また、解放されるように checkCancellation を追加しています。

これで完了かと思いきや問題がありました。
コードをテストしていて動作しない場合があることに気づきました。

以下に例を出していきます。

- 動作するパターン

Just で動作を確認した場合は以下のようになります。

.swift
let exp = expectation(description: "wait for asyncSink")
        
var cancellable: AnyCancellable = Just<Int>(0)
    .asyncSink { [weak self] number in
        print("isCancelled: ", Task.isCancelled) // ・・・① false になる
        await Task.sleep(2 * NSEC_PER_SEC)       // ・・・② `cancel()` を確認するために2秒待つ
        print("isCancelled: ", Task.isCancelled) // ・・・③ true になる
        exp.fulfill()
    }

cancellable.cancel() // ・・・④ 処理を即時キャンセルする

wait(for: [exp], timeout: 3.0)

処理の時系列はこのようになります。

  1. 実行時はキャンセル前なので Task.isCancelledfalse になっている
  2. で2秒待っている間に、のキャンセル処理が実行される
  3. その結果、ではTask.isCancelledtrue になっている

つまり、ちゃんとキャンセル処理がなされていることが確認できます。

- 動作しないパターン

上記のテストを PassthroughSubject で確認した場合はうまく動作しませんでした。

根本的に asyncSink呼ばれていないことに気づいたため、以下のようなデバッグを行いました。具体的には handleEvents で動作を追って見ました。

.swift
let subject = PassthroughSubject<Void, Never>()
        
subject
    .handleEvents(receiveSubscription: {
        debugPrint("----receiveSubscription----", $0)
    }, receiveOutput: {
        debugPrint("----receiveOutput----", $0)
    }, receiveCompletion: {
        debugPrint("----receiveCompletion----", $0)
    }, receiveCancel: {
        debugPrint("----receiveCancel----")
    }, receiveRequest: {
        debugPrint("----receiveRequest----", $0)
    })
    .asyncSink {
        debugPrint("----asyncSink----", $0)
    }
    .store(in: &cancellable)

subject.send(())

ログを以下のようになっていました。

"----receiveSubscription----" PassthroughSubject
"----receiveRequest----" unlimited
"----receiveCancel----"

どうやら、内部的に勝手にキャンセルされていそうということがわかりました。

また、原因は以下のようにインスタンスを保持していないことだということもわかりました。

そこで、「2. 拡張した処理の追加」で作成したコードを改良していきます。

3. AnyCancellable の破棄を防ぐ

AnyCancellable を保持できる仕組みにすることで、破棄されないようにします。

func asyncSink(
    priority: TaskPriority? = nil,
    receiveValue: @escaping ((Self.Output) async -> Void)
- ) -> AnyCancellable {
+ ) -> Set<AnyCancellable> {
    // `AnyCancellable` を保持できるようにする
+   var set = Set<AnyCancellable>()
    var task: Task<Void, Never>?
 
-   return self.sink { value in   
+   let cancellable = self.sink { value in
        task = Task(priority: priority) {
           try? Task.checkCancellation()
            await receiveValue(value)
        }
    }
    
    // `AnyCancellable` を保持する
+   cancellable
+       .store(in: &set)
    
    // 別途、キャンセル処理を実装し、同じく保持する
+   cancellable
+       .cancel { task?.cancel() }
+       .store(in: &set)
    
    // 保持した `AnyCancellable` を返す
    return set
}

ポイントとしては、sink 作られた AnyCancellable を保持することです。.cancel は別途作成して別々にします。

また、今まで返り値は単体の AnyCancellable だったのに対し、複数である Set<AnyCancellable> になっているので、store が複数でも行えるように拡張すると、実際の運用でも書きやすいです。
(別に拡張する必要はないが、ボイラーテンプレートになるので)

.swift
extension Sequence where Element == AnyCancellable {

    func store<C>(
        in collection: inout C
    ) where C: RangeReplaceableCollection, C.Element == Element {
        forEach { $0.store(in: &collection) }
    }

    func store(in set: inout Set<Element>) {
        forEach { $0.store(in: &set) }
    }

    func cancel() {
        forEach { $0.cancel() }
    }
}

このコードを PassthroughSubject で動作を確認した場合は以下のようになります。

.swift
let exp = expectation(description: "wait for asyncSink")
let subject = PassthroughSubject<Void, Never>()
var cancellables = Set<AnyCancellable>()

subject.asyncSink(priority: .background) {
    XCTAssertFalse(Task.isCancelled)                     // false を出力
    try? await Task.sleep(nanoseconds: 2 * NSEC_PER_SEC) // キャンセル処理を2秒待つ
    XCTAssertTrue(Task.isCancelled)                      // true を出力
    exp.fulfill()
}.store(in: &cancellables)

subject.send(())      // 処理の実行
sleep(1)              // キャンセル処理の実行を1秒だけ遅らせる
cancellables.cancel() // キャンセルの実行

wait(for: [exp], timeout: 3.0)

今度はちゃんとキャンセル処理がなされていることが確認できました。

今回、紹介したものをあくまでも一例ですが、このようにキャンセルの処理を実装することができました。

その他

他の拡張も合わせてライブラリ実装にまとめてあるので、中のコードを参考にしていただければ幸いですmm

PR や Issue もお待ちしています!

終わりに

実は Combine コミュニティで提案された AsyncStream を使ったキャンセル処理もあるのですが、コードがあまりにも多いため、個人的にはもう少しライトに実装できるものがよいと考えて採用を見送りましたmm

AsyncStream でながながと処理の追えないコードを実装するくらいなら、前回の記事でも紹介した CombineExt にある create コードをコピーする方が良いと思ったので、そこまでするほどでもないと考えたためです。

まだ、改善の余地はあると思うので、コメントなどあれば PR いただけますとmm

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
1