LoginSignup
20
8

More than 1 year has passed since last update.

[Swift] Combine/RxSwift と Concurrency をシームレスに繋ぐオペレータで拡張する

Last updated at Posted at 2022-10-29

※ 本記事では Combine のサンプルコードをメインに記載していきます。
※ 例では AnyCancellableDisposeBag は冗長になるので記載していない部分があります。ご了承ください。

初めに

アーキテクチャーに Combine/RxSwift などのリアクティブプログラミングを用いていると、後続のModelRepositoryなどでも、接続部分をリアクティブプログラミングで書くことがあるでしょう。(連結が楽でコードが綺麗になるため)

しかし、Concurrencyの登場によって、これが変わろうとしています。

今回は、一部の処理をConcurrencyに置き換えた際に、CombineからConcurrencyを呼ぶ必要が出てきたため、これを綺麗に書けないかと模索してみました。

課題

リアクティブプログラミングにおいて、処理の末端Concurrencyを呼ぶ分には特に問題ありません。例を見ていきましょう。

簡単なConcurrencyのメソッドを用意しました。

.swift
func calculate(number: Int) async -> Int {
    return number + 2
}

これを呼んでいきます。

【Example1】

.swift
Just(10)
    .map { $0 / 2 }
    .sink { number in 
        Task {
            let result = await calculate(number: number)
            print(result) // 「7」が出力される
        }
    }
    .store(in: &cancelables)

無事に実行されています。気になる点としては、毎回Taskを書くことになりネストしてしまうので、これを綺麗に書きたいと思うことです。(これに関しては後述します。)

【Example2】

処理の途中でConcurrencyの処理を実施すると問題が発生します。

.swift
Just(10)
    .map { $0 / 2 }
    .handleEvents(receiveOutput: { number in
        Task {
            let result = await task(number: number)
            return result // エラーになる
        }
    })
    .sink { number in 
        print(number)
    }
    .store(in: &cancelables)

途中で処理を加工して後続に渡すことができません。
そこで、処理を隠蔽化してこれを解決していきます。

実装

RxSwiftにはRxConcurrencyというライブラリが既にあるので、Combineでの実装もこちらを参考に実装していきます。

1. RxSwift での実装を見る

ライブラリの実装を抜粋すると、以下の部分になります。

.swift
extension ObservableType {

    static func async(_ handler: @escaping () async throws -> Element) -> Observable<Element> {
        Observable<Element>.create { observer in
            let task = Task {
                do {
                    observer.on(.next(try await handler()))
                    observer.on(.completed)
                } catch {
                    observer.on(.error(error))
                }
            }
            
            return Disposables.create {
                task.cancel()
            }
        }
    }
}

行っていることはシンプルで、内部でObservablecreateして、Coucurrencyの処理をラップしています。

実際に使用する場合はこのようになります。

.swift
Observable<Int>
    .async {
        await self.calculate(number: 10)
    }
    .subscribe { number in
        print(number) // 「12」が出力される
    }
    .disposed(by: disposeBag)

このように処理の途中で、Coucurrencyの処理を実行できます。

2. Combine での実装

これをCombineで実装していきます。ただ、CombineにはRxSwiftcreateに相当するオペレータがないため、先ほどのように実装することができません。

もし、Combineの拡張ライブラリであるCombineExtを使った場合は、createのオペレータがあるので、同じように拡張が可能です。

- CombineExt を使った場合

以下のcreateのオペレータを使って拡張していきます。

では、実装を見ていきましょう。

.swift
extension Publishers {
        
    static func async<Output>(_ handler: @escaping () async -> Output) -> AnyPublisher<Output, Never> {
        AnyPublisher<Output, Never>.create { subscriber in
            let task = Task {
                let result = await handler()
                subscriber.send(result)
                subscriber.send(completion: .finished)
            }

            return AnyCancellable {
                task.cancel()
            }
        }
    }
}

最初に提示した RxSwift の拡張コードと同じ構造になっていることがわかります。

また、実際に使用する場合はこのようになります。

.swift
Publishers
    .async {
        await self.calculate(number: 10)
    }
    .sink { number in
        print(number) // 「12」が出力される
    }
    .store(in: &cancellable)

このように簡単に実装ができました。

今回は Publishers(かつstatic func)で拡張していますが、個別にそれぞれを拡張することも可能です。例えば、以下は Just を拡張した例です。

.swift
extension Just {
    
    func async<Input, Output>(_ handler: @escaping (Input) async -> Output) -> AnyPublisher<Output, Never> {
        AnyPublisher<Output, Never>.create { subscriber in
            let task = Task {
                let result = await handler(output as! Input) // ※注意
                subscriber.send(result)
                subscriber.send(completion: .finished)
            }

            return AnyCancellable {
                task.cancel()
            }
        }
    }
}

output プロパティは Just から流れてくる値で、値の型でキャストしてあげる必要がある。

実際に使用する場合はこのようになります。

.swift
Just<Int>(10)
    .async { number in
        await self.calculate(number: number)
    }
    .sink { number in
        print(number) // 「12」が出力される
    }
    .store(in: &cancellable)

このように必要に応じて、拡張をいくつか用意してあげることで柔軟に実装できるようになります。ただし、冒頭にも述べた通り、拡張ライブラリであるCombineExtを使った場合にのみ実装可能です。

では、拡張ライブラリを使わない方法を考えていきましょう。

- CombineExt を使わない場合

先ほど言及した create にあたるオペレータを自前で実装するか、それと同じような挙動をするものを用意することになります。

拡張ライブラリである CombineExt の中にある create のコードをコピーすれば良いのでは?と思うかもしれませんが、思いのほか依存関係が多いため、いくつかのコードを引っ張ってくる必要があり、自前でミニマムに実装したほうがコードも少なくなります

flatMapFuture を使って拡張します。


① エラーハンドリングを必要としない拡張
.swift
extension Publisher {
    
    func asyncMap<V>(
        _ asyncFunction: @escaping (Output) async -> V
    ) -> Publishers.FlatMap<Future<V, Never>, Self> {
        
        flatMap { value in
            Future { promise in
                Task {
                    promise(.success(await asyncFunction(value)))
                }
            }
        }
    }
}

実際に使用する場合はこのようになります。

.swift
Just<Int>(20)
    .asyncMap { number in
        await self.calculate(number: number)
    }
    .sink { number in
        print(number) // 「22」が出力される
    }
    .store(in: &cancellable)

(例は Just ですが Publisher であれば使用可能です。)


② エラーハンドリングできる拡張

エラーハンドリングをしたい場合は、先ほどのものをさらに拡張します。

.swift
extension Publisher {

    func asyncMapWithThrows<V>(
        _ asyncFunction: @escaping (Output) async throws -> V
    ) -> Publishers.FlatMap<Future<V, Error>, Publishers.SetFailureType<Self, Error>> {
        
        flatMap { value in
            Future { promise in
                Task {
                    do {
                        let output = try await asyncFunction(value)
                        promise(.success(output))
                    } catch {
                        promise(.failure(error))
                    }
                }
            }
        }
    }
}

実際に使用する場合はこのようになります。

.swift
let subject = PassthroughSubject<(), Never>()

subject
    .asyncMapWithThrows {
        try await APIClient.fetch()
    }
    .sink(receiveCompletion: { result in
        // handle result
    }, receiveValue: { value in
        // handle value
    })
    .store(in: &cancellable)

subject.send(())

Publisher の途中でAPIを叩くパターンです。

このように Combine -> Concurrency -> Combine をシームレスに実行できるようになります。


- 補足

ちなみに TaskPriority を引数で渡せるようにすることも可能ですが、Combine.subscribe(on: ) とバッティングしてしまうため、設定できないようにしています。
CombineExt の PR で同様の指摘がされており、RxConcurrency では引数を渡さない作りになっているので、それを採用した形)


3. 番外編

Task のネストを減らす

気になる点としては、毎回Taskを書くことになりネストしてしまうので、これを綺麗に書きたいと思うことです。

冒頭に記載した Taskのネストを解消したい場合です。

単純に Combine -> Concurrency を行う場合は冒頭にも書きましたが、

.swift
Just(10)
    .map { $0 / 2 }
    .sink { number in 
        Task {
            do {
                // do some async task
            } catch {
                // error handling
            }
        }
    }
    .store(in: &cancelables)

となり、sinkTask で1階層ネストしてしまい、エラーハンドリングをする場合 do catch でさらに深くなってしまいます。そこでラップした関数を用意して解消いきます。


① Failure が Never の場合
.swift
extension Publisher where Self.Failure == Never {

    func asyncSink(
        receiveValue: @escaping ((Self.Output) async -> Void)
    ) -> AnyCancellable {
        self.sink { value in
            Task {
                await receiveValue(value)
            }
        }
    }
}

実際に使用する場合はこのようになります。

.swift
Just<Int>(30)
    .asyncSink { number in
        let result = await self.calculate(number: number)
        print(result) // 「32」が出力される
    }
    .store(in: &cancellable)

非常にすっきりしました。


② Failure が Error の場合
.swift
extension Publisher where Self.Failure == Error {

    func asyncSinkWithThrows(
        receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure>) async -> Void),
        receiveValue: @escaping ((Self.Output) async -> Void)
    ) -> AnyCancellable {
        sink(receiveCompletion: { result in
            Task { await receiveCompletion(result) }
        }, receiveValue: { value in
            Task { await receiveValue(value) }
        })
    }
}

実際に使用する場合はこのようになります。

.swift
let subject = PassthroughSubject<(), Never>()

subject
    .setFailureType(to: Error.self)
    .asyncSinkWithThrows(receiveCompletion: { result in
        // handling result
    }, receiveValue: {
        let response = try await APIClient.fetch()
        // handling response
    })
    .store(in: &cancellable)

subject.send(())

今回はエラーがないパターンで行なっているのであれですが、receiveValue でも Concurrency を実装可能です。

終わりに

長くなりましたが、こちらを諸々実装したライブラリを作成しましたので、お手軽にご使用いただけますと幸いです!
追加の拡張やPRお待ちしてます!

スターください!

追記

Task のキャンセル対応のために新しい記事を追記しました!

その他

追加の拡張

RxConcurrency には他の拡張実装があるので、参考にしてみると良いでしょう。

参考文献

20
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
8