※ 本記事では Combine
のサンプルコードをメインに記載していきます。
※ 例では AnyCancellable
や DisposeBag
は冗長になるので記載していない部分があります。ご了承ください。
初めに
アーキテクチャーに Combine
/RxSwift
などのリアクティブプログラミングを用いていると、後続のModel
やRepository
などでも、接続部分をリアクティブプログラミングで書くことがあるでしょう。(連結が楽でコードが綺麗になるため)
しかし、Concurrency
の登場によって、これが変わろうとしています。
今回は、一部の処理をConcurrency
に置き換えた際に、Combine
からConcurrency
を呼ぶ必要が出てきたため、これを綺麗に書けないかと模索してみました。
課題
リアクティブプログラミングにおいて、処理の末端でConcurrency
を呼ぶ分には特に問題ありません。例を見ていきましょう。
簡単なConcurrency
のメソッドを用意しました。
func calculate(number: Int) async -> Int {
return number + 2
}
これを呼んでいきます。
【Example1】
Just(10)
.map { $0 / 2 }
.sink { number in
Task {
let result = await calculate(number: number)
print(result) // 「7」が出力される
}
}
.store(in: &cancelables)
無事に実行されています。気になる点としては、毎回Task
を書くことになりネストしてしまうので、これを綺麗に書きたいと思うことです。(これに関しては後述します。)
【Example2】
処理の途中でConcurrency
の処理を実施すると問題が発生します。
Just(10)
.map { $0 / 2 }
.handleEvents(receiveOutput: { number in
Task {
let result = await task(number: number)
return result // エラーになる
}
})
.sink { number in
print(number)
}
.store(in: &cancelables)
途中で処理を加工して後続に渡すことができません。
そこで、処理を隠蔽化してこれを解決していきます。
実装
RxSwift
にはRxConcurrency
というライブラリが既にあるので、Combine
での実装もこちらを参考に実装していきます。
1. RxSwift での実装を見る
ライブラリの実装を抜粋すると、以下の部分になります。
extension ObservableType {
static func async(_ handler: @escaping () async throws -> Element) -> Observable<Element> {
Observable<Element>.create { observer in
let task = Task {
do {
observer.on(.next(try await handler()))
observer.on(.completed)
} catch {
observer.on(.error(error))
}
}
return Disposables.create {
task.cancel()
}
}
}
}
行っていることはシンプルで、内部でObservable
をcreate
して、Coucurrency
の処理をラップしています。
実際に使用する場合はこのようになります。
Observable<Int>
.async {
await self.calculate(number: 10)
}
.subscribe { number in
print(number) // 「12」が出力される
}
.disposed(by: disposeBag)
このように処理の途中で、Coucurrency
の処理を実行できます。
2. Combine での実装
これをCombine
で実装していきます。ただ、Combine
にはRxSwift
のcreate
に相当するオペレータがないため、先ほどのように実装することができません。
もし、Combine
の拡張ライブラリであるCombineExtを使った場合は、create
のオペレータがあるので、同じように拡張が可能です。
- CombineExt を使った場合
以下のcreate
のオペレータを使って拡張していきます。
では、実装を見ていきましょう。
extension Publishers {
static func async<Output>(_ handler: @escaping () async -> Output) -> AnyPublisher<Output, Never> {
AnyPublisher<Output, Never>.create { subscriber in
let task = Task {
let result = await handler()
subscriber.send(result)
subscriber.send(completion: .finished)
}
return AnyCancellable {
task.cancel()
}
}
}
}
最初に提示した RxSwift
の拡張コードと同じ構造になっていることがわかります。
また、実際に使用する場合はこのようになります。
Publishers
.async {
await self.calculate(number: 10)
}
.sink { number in
print(number) // 「12」が出力される
}
.store(in: &cancellable)
このように簡単に実装ができました。
今回は Publishers
(かつstatic func
)で拡張していますが、個別にそれぞれを拡張することも可能です。例えば、以下は Just
を拡張した例です。
extension Just {
func async<Input, Output>(_ handler: @escaping (Input) async -> Output) -> AnyPublisher<Output, Never> {
AnyPublisher<Output, Never>.create { subscriber in
let task = Task {
let result = await handler(output as! Input) // ※注意
subscriber.send(result)
subscriber.send(completion: .finished)
}
return AnyCancellable {
task.cancel()
}
}
}
}
※ output
プロパティは Just
から流れてくる値で、値の型でキャストしてあげる必要がある。
実際に使用する場合はこのようになります。
Just<Int>(10)
.async { number in
await self.calculate(number: number)
}
.sink { number in
print(number) // 「12」が出力される
}
.store(in: &cancellable)
このように必要に応じて、拡張をいくつか用意してあげることで柔軟に実装できるようになります。ただし、冒頭にも述べた通り、拡張ライブラリであるCombineExtを使った場合にのみ実装可能です。
では、拡張ライブラリを使わない方法を考えていきましょう。
- CombineExt を使わない場合
先ほど言及した create
にあたるオペレータを自前で実装するか、それと同じような挙動をするものを用意することになります。
拡張ライブラリである CombineExt
の中にある create
のコードをコピーすれば良いのでは?と思うかもしれませんが、思いのほか依存関係が多いため、いくつかのコードを引っ張ってくる必要があり、自前でミニマムに実装したほうがコードも少なくなります。
flatMap
と Future
を使って拡張します。
① エラーハンドリングを必要としない拡張
extension Publisher {
func asyncMap<V>(
_ asyncFunction: @escaping (Output) async -> V
) -> Publishers.FlatMap<Future<V, Never>, Self> {
flatMap { value in
Future { promise in
Task {
promise(.success(await asyncFunction(value)))
}
}
}
}
}
実際に使用する場合はこのようになります。
Just<Int>(20)
.asyncMap { number in
await self.calculate(number: number)
}
.sink { number in
print(number) // 「22」が出力される
}
.store(in: &cancellable)
(例は Just
ですが Publisher
であれば使用可能です。)
② エラーハンドリングできる拡張
エラーハンドリングをしたい場合は、先ほどのものをさらに拡張します。
extension Publisher {
func asyncMapWithThrows<V>(
_ asyncFunction: @escaping (Output) async throws -> V
) -> Publishers.FlatMap<Future<V, Error>, Publishers.SetFailureType<Self, Error>> {
flatMap { value in
Future { promise in
Task {
do {
let output = try await asyncFunction(value)
promise(.success(output))
} catch {
promise(.failure(error))
}
}
}
}
}
}
実際に使用する場合はこのようになります。
let subject = PassthroughSubject<(), Never>()
subject
.asyncMapWithThrows {
try await APIClient.fetch()
}
.sink(receiveCompletion: { result in
// handle result
}, receiveValue: { value in
// handle value
})
.store(in: &cancellable)
subject.send(())
Publisher
の途中でAPIを叩くパターンです。
このように Combine
-> Concurrency
-> Combine
をシームレスに実行できるようになります。
- 補足
ちなみに TaskPriority
を引数で渡せるようにすることも可能ですが、Combine
の .subscribe(on: )
とバッティングしてしまうため、設定できないようにしています。
(CombineExt
の PR で同様の指摘がされており、RxConcurrency
では引数を渡さない作りになっているので、それを採用した形)
3. 番外編
Task のネストを減らす
気になる点としては、毎回
Task
を書くことになりネストしてしまうので、これを綺麗に書きたいと思うことです。
冒頭に記載した Task
のネストを解消したい場合です。
単純に Combine
-> Concurrency
を行う場合は冒頭にも書きましたが、
Just(10)
.map { $0 / 2 }
.sink { number in
Task {
do {
// do some async task
} catch {
// error handling
}
}
}
.store(in: &cancelables)
となり、sink
と Task
で1階層ネストしてしまい、エラーハンドリングをする場合 do catch
でさらに深くなってしまいます。そこでラップした関数を用意して解消いきます。
① Failure が Never の場合
extension Publisher where Self.Failure == Never {
func asyncSink(
receiveValue: @escaping ((Self.Output) async -> Void)
) -> AnyCancellable {
self.sink { value in
Task {
await receiveValue(value)
}
}
}
}
実際に使用する場合はこのようになります。
Just<Int>(30)
.asyncSink { number in
let result = await self.calculate(number: number)
print(result) // 「32」が出力される
}
.store(in: &cancellable)
非常にすっきりしました。
② Failure が Error の場合
extension Publisher where Self.Failure == Error {
func asyncSinkWithThrows(
receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure>) async -> Void),
receiveValue: @escaping ((Self.Output) async -> Void)
) -> AnyCancellable {
sink(receiveCompletion: { result in
Task { await receiveCompletion(result) }
}, receiveValue: { value in
Task { await receiveValue(value) }
})
}
}
実際に使用する場合はこのようになります。
let subject = PassthroughSubject<(), Never>()
subject
.setFailureType(to: Error.self)
.asyncSinkWithThrows(receiveCompletion: { result in
// handling result
}, receiveValue: {
let response = try await APIClient.fetch()
// handling response
})
.store(in: &cancellable)
subject.send(())
今回はエラーがないパターンで行なっているのであれですが、receiveValue
でも Concurrency
を実装可能です。
終わりに
長くなりましたが、こちらを諸々実装したライブラリを作成しましたので、お手軽にご使用いただけますと幸いです!
追加の拡張やPRお待ちしてます!
スターください!
追記
Task のキャンセル対応のために新しい記事を追記しました!
その他
追加の拡張
RxConcurrency
には他の拡張実装があるので、参考にしてみると良いでしょう。
参考文献