- 最近SwiftConcurrencyに既存プロジェクトを置き換えたりする中で色々と調べたので備忘録として書き残します。
- 主に
async/await
,Async(Throwing)Stream
,Task
についての記事になります。
- 主に
async/awaitの基本的な使い方
asyncとawaitについて
-
async
はメソッドが非同期作業を実行することを明確にするメソッド属性です。- 例としてAppleが提供しているTask型のstaticメソッドsleepなどがあります。
public static func sleep(nanoseconds duration: UInt64) async throws
-
await
はasync
な非同期メソッドを呼び出す際に使われるキーワードです。
try await Task.sleep(nanoseconds: UInt64(1 * 1_000_000_000))
- このようにasyncとawaitは対をなす関係にあります。
直列実行パターン
e.g. API1からUser一覧を取得してAPI2からUserの詳細を取得するケース
-
処理は上から順番に実行されます。
- User一覧の取得が終わってから、その値を用いてUser詳細を取得することができます。
sleepメソッドのfinishがコンソールに出力されるまで3秒かかるサンプル
- このように直列実行のため
sleep()
メソッドの全体の実行時間は3秒かかります。
並列実行
- 並列実行の方法は
async let
とwith(Throwing)TaskGroup(of:
を用いた書き方があります
async letパターン
e.g. 複数のAPIを叩いてそれぞれの結果を合成したモデルを返すケース
-
async let
の宣言部ではawait
キーワードは不要で、その値を実際に使いたい時にawait
を記述します。- また、
async let
で宣言した瞬間に非同期処理が走ります。
- また、
// 非同期処理が宣言とともに走る
async let taskA = fetchA()
async let taskB = fetchB()
// 上記で宣言したtaskA, taskBの結果を用いる場合にawait構文が必要になる
let (resultA, resultB) = try await (taskA, taskB)
let resultC = try await fetchC(a: resultA, b: resultB)
- 下記sleepメソッドの
finish
が出力されるのは並列実行のため2秒です。
func sleep() async throws {
async let sleep1: () = Task.sleep(nanoseconds: UInt64(1 * 1_000_000_000)) // 1秒
async let sleep2: () = Task.sleep(nanoseconds: UInt64(2 * 1_000_000_000)) // 2秒
_ = try await (sleep1, sleep2)
print("finish")
}
- 注意点として下記のような書き方もできますが、これは並列にならず直列実行になってしまいます。
async let (taskA, taskB) = (fetchA(), fetchB())
let (resultA, resultB) = try await (taskA, taskB)
- また、
async let
で宣言した変数を使うときにawait
を書きますが、複数回その値を使いたい場合に毎度await
を書いても非同期処理そのものは宣言した瞬間に一度走るだけなのでawait
を書くたびに非同期処理が走ることはありません。
async let value = fetchA()
// 特にfetchA()の処理が2回走ってしまうことはない
let result1 = fetchResult1(value: await value)
let result2 = fetchResult2(value: await value)
async letな変数を使わずにスコープを抜けた場合
-
あるfunction内で宣言した
async let
な変数をtry await
などせずにそのfunctionのスコープを抜けた場合にどのような挙動になるでしょうか。 -
答えはスコープを抜けた時点でまだ
async let
で宣言した処理が完了していなかった場合は、その処理がキャンセルされます。
func sleepTask() async throws {
print("sleep start")
do {
try await Task.sleep(nanoseconds: 2_000_000_000) // 2秒待つ処理
} catch {
print(error.localizedDescription)
}
print("sleep finish")
}
func asyncSleep() async throws -> String {
print("start asyncSleep")
async let sleep1 = sleepTask()
async let sleep2 = sleepTask()
print("end asyncSleep")
return "finish"
}
// 呼び出しもと
Task {
do {
let result = try await asyncSleep()
print("result: " + result)
} catch {
print(error.localizedDescription)
}
}
コンソールの出力結果
start asyncSleep
end asyncSleep
sleep start
sleep start
The operation couldn’t be completed. (Swift.CancellationError error 1.)
sleep finish
The operation couldn’t be completed. (Swift.CancellationError error 1.)
sleep finish
result: finish
-
なぜこういった挙動になるのかというと、
async let
で宣言したものは後述する構造化されたTaskとして扱われるからだと思われます。- スコープを抜けても処理を中断するのではなく、そのまま処理を続けさせたい場合は
async let
ではなく、新規でTask {}
を作り構造化されていないTaskとして扱う必要があります。
- スコープを抜けても処理を中断するのではなく、そのまま処理を続けさせたい場合は
with(Throwing)TaskGroupパターン
e.g. あるAPIの返り値である可変長配列をもとにその数分の並列処理をしたいケース
withThrowingTaskGroup(of:returning:body:)
-
第一引数にはgroup.addで追加した子タスクの処理の返り値の型を指定します。
UserDetail.self
-
第二引数にはwithThrowingTaskGroup関数自体が返す戻り値の型です。
- [UserDetail].selfだが書き方によって省略可能
-
第三引数には子タスクを使った並列処理のクロージャを書きます。
- クロージャにThrowingTaskGroup型の引数(group)が渡ってくるので、このgroupに並列処理を追加していくことになります。
-
なお、子タスク内の処理でエラーが起きてもその時点ではエラーをthrowしないが、groupに対して
reduce
,waitForAll()
,next()
などを呼び出した際にエラーがrethrowされるようになっているので子タスクのエラーハンドリングをしたい場合は注意が必要です。 -
子タスク内で戻り値がVoid型であるAPIなどを叩いていて、特にgroupの処理を待っていないコードだと子タスクのAPIのエラーが伝播されません。
-
withThrowingTaskGroup
のクロージャの引数であるgroupにaddメソッドで子タスクを追加して並列処理を走らせることが可能だが、これはgroupに追加された子タスクがそれぞれ並列で処理されるだけなので子タスクの中で直列に書いたものは直列実行されます。
let number = try await withThrowingTaskGroup(of: Int.self) { group in
[1, 2, 3].forEach { num in
// 最終的に3つの子タスクが作られる
// 子タスク内の処理は直列で書かれているので、それぞれの子タスクの完了時間は6秒となる
// 子タスク同士は並列に動作するので全ての子タスクが完了する時間も6秒
group.add {
_ = try await Task.sleep(UInt64(1 * 1_000_000_000))
_ = try await Task.sleep(UInt64(2 * 1_000_000_000))
_ = try await Task.sleep(UInt64(3 * 1_000_000_000))
return num
}
}
return try await group.reduce(into: 0) { result, num in
result += num // 6
}
}
- 上記の子タスクが全て終わる時間を3秒にしたい場合は子タスク内の処理を
async let
で記述すれば良いです。
group.add {
async let task1 = try await Task.sleep(UInt64(1 * 1_000_000_000))
async let task2 = try await Task.sleep(UInt64(2 * 1_000_000_000))
async let task3 = try await Task.sleep(UInt64(3 * 1_000_000_000))
_ = try await (task1, task2, task3)
return num
}
- なお、Errorをthrowしないバージョンの
withTaskGroup(of:returning:body:)
も存在します。
AsyncStream(AsyncThrowingStream)
- SwiftではSequenceプロトコルに準拠することで
forEach
,map
,filter
,reduce
などの関数やfor-in
文も使えるようになりますが、これのasync/await
に対応した非同期バージョンとしてAsyncSequenceプロトコルというものがあります。
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@rethrows public protocol AsyncSequence {
/// The type of asynchronous iterator that produces elements of this
/// asynchronous sequence.
associatedtype AsyncIterator : AsyncIteratorProtocol
/// The type of element produced by this asynchronous sequence.
associatedtype Element where Self.Element == Self.AsyncIterator.Element
/// Creates the asynchronous iterator that produces elements of this
/// asynchronous sequence.
///
/// - Returns: An instance of the `AsyncIterator` type used to produce
/// elements of the asynchronous sequence.
func makeAsyncIterator() -> Self.AsyncIterator
}
- 例としてURL型にはlinesというプロパティが生えていて、このプロパティの型はAsyncSequenceプロトコルに準拠したAsyncLineSequenceという型が定義されています。
public var lines: AsyncLineSequence<URL.AsyncBytes> { get }
- これを使うことで指定したURLからその内容を非同期で1行ずつ取得することが可能になります。
Task {
let url = URL(string: "https://www.apple.com/jp/")!
for try await line in url.lines {
print(line + "🌟")
}
}
- 出力
<!DOCTYPE html>🌟
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="ja-JP" lang="ja-JP" prefix="og: http://ogp.me/ns#" class="no-js" data-layout-name="apple-trade-in-event">🌟
<head>🌟
🌟
<meta charset="utf-8" />🌟
<link rel="canonical" href="https://www.apple.com/jp/" />🌟
🌟
-
このようにAsyncSequenceプロトコルに準拠することで非同期処理の1つ1つが完了したタイミングで値を取得でき、
map
やreduce
などを使ってその値を簡単に変換することが可能になります。 -
AsyncStreamを使うとカスタムな型をAsyncSequenceプロトコルに容易に準拠させることが可能です。
-
このようにAsyncStreamでラップすることでAsyncSequenceプロトコルに準拠したものを簡単に作ることができる
-
AsyncStreamのinitのクロージャにContinuationという型の引数(continuation)が渡ってくるので、continuationに対して
yield(_ value:)
もしくはfinish()
を流します。-
yield(_ value:)
にはAsyncStreamのジェネリクスで定義した型の値を流します。 -
continuationにErrorを流すことのできる
finish(throwing: )
はAsyncThrowingStreamを使うことで可能です。
-
-
AsyncStreamに
finish()
orfinish(throwing: )
が流れた時点でStreamが終了します。-
AsyncStreamをfor-loopなどで使っていた側のloop処理を抜けるということです。
-
逆にいうと
finish()
orfinish(throwing: )
を流さない場合、呼び出し側のfor-loopが終わらないので注意が必要となります。 -
AsyncStreamを実行しているTaskをキャンセルしてもfor-loopを抜けることができるが、その後にキャンセルをハンドリングして適切な
finish(throwing: )
を呼ばないとAsyncStreamを実行しているTask自体が正常終了と見なされてしまいます。(Task章で後述)
-
onTermination
AsyncStreamはダウンロードプログレスのような処理に最適
-
AsyncSequenceの特徴として非同期処理の1つ1つが完了したタイミングで通知がきて、エラーが起きると以降の処理は通知されません。
-
この特性はダウンロードの完了までにプログレス表示をするようなケースに使えるものです。
-
例えば既存のAPIでファイルをダウンロードするような下記のFileDownloaderというコードがあるとします。
Taskについて
-
Task型
-
@frozen struct Task<Success, Failure> where Success : Sendable, Failure : Error
-
イニシャライザ
@discardableResult init(priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> Success)
-
イニシャライザ
-
func createTask() -> Task<Int, Error> {
Task {
try await Task.sleep(nanoseconds: UInt64(1 * 1_000_000_000))
return 1
}
}
-
ジェネリクスで定義した
Success, Failure
型の値を保持します。-
クロージャ内でその型を返します。
-
try await
の箇所でErrorがthrowされた場合、TaskのFailure
型の方にそのエラーが代入されます。
-
当初のTaskの役割やイメージ
-
async
付きのメソッドを並行性をサポートしていない同期環境から呼び出そうとしたときに出るエラーで'async' in a function that does not support concurrency
というエラーが出ます。
-
これを回避するには同じく
async
付きのメソッドから呼び出すかTask {}
で囲む必要があります。- アプリケーションの根源は同期環境から始まるので
async
なメソッドを呼び出す際にどこかで必ずTaskを使う必要があります。
- アプリケーションの根源は同期環境から始まるので
-
なので手っ取り早く
'async' in a function that does not support concurrency
を回避できるもの・・- (^p^)<なんかよく分からないけどTaskで囲んだらなおった・・
- という程度の認識でした。
- ここからもう一歩理解を進めてみました。
Taskとは何なのか
- タスクとはプログラムの一部として非同期で実行できる作業の単位であり、すべての非同期コードは何らかのタスクの一部として実行されます。
-
2つのTaskを作って、そのTask内で非同期のsleepを呼び出すと各々のTaskが並列に実行されます。
-
メソッド内の順序としては出力の内容の通りでTaskの非同期処理が終わる前にこのメソッドを抜けます。
-
各Taskは並列に実行されるので2秒後に最後の非同期処理の
finish sleep 2 second
が出力されます。 -
Taskは生成後すぐに実行され明示的な開始の必要はありません。
i. ただしTaskのハンドリングをしないと処理を非同期で投げっぱなしただけになるのであまり使い所はない気がします。
(RxSwiftのdoオペレータのように成功・失敗関係なくとりあえず非同期にログを送っておくみたいな処理には良いかもしれませんが)
Taskのハンドリング
- では下記のような3つのTaskの非同期処理が完了してから関数の返り値であるInt型を返すようにするにはどうすれば良いでしょうか。
- 出力の通りこの書き方だと関数を抜けて
3
が出力されてから各々の非同期処理が完了しています。
-
私が最初に思いついたのは、この関数を
async
にして呼び出し側にawait
を書くことでした。1-2. しかしながらこれでは先ほどと同じ結果になってしまいました。
Taskで実行した非同期処理の値を取得できるresult
とvalue
プロパティ
-
関数を各Taskの非同期処理が終わってからInt型を返すようにしたい場合は、Task型に生えている
result
もしくはvalue
というプロパティがasync
になっているのでこれを使うことで非同期処理の完了を待つことができます。-
result
プロパティはResult型になっていてSuccessもしくはErrorの値が取得できます。 -
value
プロパティはジェネリクスで定義したSuccess型の値の取得を試みますがTaskがエラーをスローした場合、このプロパティはそのエラーを伝搬します。
-
public var result: Result<Success, Failure> { get async }
public var value: Success { get async throws }
-
出力を見てみると
-
全ての非同期Taskが終わってから
3
が返されるようになっています。 -
各Taskが並列ではなく直列で実行されるようになっているので
Task.sleep
コードが上から2秒、1秒、3秒の順で終わり全てのTaskの処理が完了するまで合計で6秒かかるようになっています。
-
-
各Taskを並列に実行した上で完了を待ちたい場合
-
元々Taskは宣言した瞬間に実行されるので各Taskを変数に保持してタプルで一気に
try await
してあげると並列になります。 -
または、各Taskの
result
をasync let
で宣言して、await
することで並列実行になる- 関数の完了は3秒になる
Taskのエラーハンドリング
-
Taskの
result
プロパティには非同期処理の結果が格納されます(Success
orFailure
)。 -
Task内の非同期処理でErrorがthrowされた場合は下記のようにエラーハンドリングできます。
value
プロパティを使った呼び出しの場合
複数のTaskでエラーが発生した際にどのエラーがハンドリングされるのか
-
書き方(
value
,result
,try?
など)によるが直列と並列のシンプルなパターンでの順番を記載します。 -
直列の場合
-
各Taskを直列に実行した場合、最初にエラーが発生したTaskのErrorがthrowされ、次のTaskは処理されません。
-
並列の場合
Taskのキャンセル処理
-
Taskには
cancel()
メソッドがありTaskの処理を停止することができます。- 標準APIであるTaskのsleepメソッドはスリープ中にcancel()が実行されると
CancellationError
をthrowするような設計になっています。
- 標準APIであるTaskのsleepメソッドはスリープ中にcancel()が実行されると
public static func sleep(nanoseconds duration: UInt64) async throws
- このメソッドを使ってキャンセル処理をすると下記のようになります。
-
同じようにネットワークリクエストで
public func data(from url: URL, delegate: URLSessionTaskDelegate? = nil) async throws -> (Data, URLResponse)
を実行中にcancel()
を使うとNSURLErrorDomainのcancelledがthrowされているようになっています。-
ただしタスクをキャンセルしても、その場で即座に処理が停止するわけではありません。
-
タスクのキャンセルはタスクに "キャンセルされた" というフラグを立てるだけで、本当にキャンセル実行するのかどうかは各タスク側で明示的に確認する必要があるみたいです。
-
この仕様を 「協調的なキャンセル」(Cooperative cancellation)というらしいです。
-
- このようにTaskがキャンセルされた時にどういった処理をするかということを自前の非同期処理に実装することができます。
Taskのキャンセルを検知する
- 自身の非同期処理(Task)のキャンセルを検知するAPIは下記があります。
public var isCancelled: Bool { get }
public static var isCancelled: Bool { get }
public static func checkCancellation() throws
public func withTaskCancellationHandler<T>(operation: () async throws -> T,
onCancel handler: @Sendable () -> Void) async rethrows -> T
public func withTaskCancellationHandler<T>(handler: @Sendable () -> Void,
operation: () async throws -> T) async rethrows -> T
- 例えばあるファイルをネットワークからダウンロードして、そのデータをさら別の形式に変換する重い処理をするTaskがあったとして、これらのAPIを使ってキャンセルチェックを行うことで不要な処理を減らせるようになります。
static var isCancelled: Bool { get }
-
Taskがキャンセルされたかどうかを返します。
- staticプロパティはTaskのクロージャ内でのみ自身がキャンセルされたかどうかを判定できます。
-
下記はファイルダウンロード後にキャンセルチェックをしてキャンセルされていた場合は独自定義のErrorをthrowする場合のコードです。
- これにより不要な重い処理を回避することができます。
public static func checkCancellation() throws
- Taskがキャンセルされていた場合に
CancellationError
をthrowするメソッドです。
public func withTaskCancellationHandler(operation: () async throws -> T, onCancel handler: @Sendable () -> Void) async rethrows -> T
-
これまではキャンセルを自ら定期的にチェックする方法でしたが、
withTaskCancellationHandler
を使うことでキャンセルされたタイミングを即座に検知することができます。- ただし、キャンセルを即座に検知できるだけで
operation:
クロージャに書いた処理は中断されることなく続くので、キャンセルを即座に検知した際に自前で中断できるような書き方をする必要があります。
- ただし、キャンセルを即座に検知できるだけで
-
public func withTaskCancellationHandler<T>(handler: @Sendable () -> Void, operation: () async throws -> T) async rethrows -> T
はoperation:
とonCancel handler:
の引数の位置が逆になっただけのものなので割愛
構造化されたTask
-
各々のTaskのライフサイクルやキャンセルが管理されている構造のこと(親子関係)です。
-
構造化されていることで親Taskの
cancel()
メソッドを呼び出すと、そのTaskに関連する子Taskのキャンセルも自動で呼び出されます。 -
構造化されたTaskを作れるのは
async let
とwith(Throwing)TaskGroup
を使った場合のみです。
-
-
下記のように親タスクの
cancel()
を呼び出すことでasync let
で内部的に作られた子タスクにもキャンセルが伝播しtry await URLSession.shared.data(from:
がキャンセル時にthrowするNSURLErrorDomainのcancelledが出力されていることが確認できます。
構造化されていないTask
-
各々のTaskが独立しておりライフサイクルやキャンセルが共有されていない構造のことです。
-
関連性がないのであるTaskの
cancel()
メソッドを呼び出しても他のTaskに通知がいくことはありません。 -
Task型のイニシャライザや
Task.detached
などを用いて作成したものは全て構造化されていないTaskになります。(async let
やwith(Throwing)TaskGroup
以外)
-
-
async let
を使っても新たなTaskを生成しているような場合は構造化された関係にはなりません。
Async(Throwing)Streamの場合のキャンセル処理
-
AsyncStreamを実行しているTaskがキャンセルされるとStreamが終了する(for-loopを抜ける)が、
onTermination
クロージャの中でキャンセルを検知して明示的にstreamにfinish(throwing: )
を流さないとエラーが呼び出し元にthrowされません。
-
onTerminationでキャンセルを検知してStreamに
finish(throwing: )
を流した場合
- これによりTaskのresultもfailureになりStream側でthrowしたエラーになっていることが確認できます。
Async(Throwing)Streamクロージャ内で定義しているTaskについて
-
AsyncStreamのイニシャライザでstreamに値を流すクロージャは
async
なクロージャになっていないので、非同期処理を呼び出す時にはTaskを使う必要があります。 -
これはAsyncStreamを使う外側のTaskとAsyncStream内の非同期処理のTaskが構造化されていないことを表しています。
AsyncStream内のTask(非同期処理)が処理され続ける例
-
AsyncStreamを使う側のtaskをキャンセルしているのでStreamに対しての
continuation.yield("a")
などの値は流れてきませんが、コンソールにはfinish sleep 1
などが出続けることからAsyncStream内のTask処理がキャンセルされていないことが分かります。- このマズさはダウンロードをキャンセルする場合などを想像してみると明白です。
-
これを回避するにはStreamの
onTermination
でキャンセルを検知し、AsyncStream内で定義したTaskのインスタンスに対してキャンセルを実行する必要があります。
-
innerStreamTask
をキャンセルすることでコンソールにfinish sleep 1
などが出なくなりました。-
innerStreamTask
のTask.sleep
がキャンセルを検知したことによりCancellationError
がthrowされたためです。- なお、throwされたCancellationErrorは
innerStreamTask
のresultに格納されますが、このエラーがAsyncStreamの呼び出し元に伝搬されることはありません。(呼び出し元に伝わるのはコンソールに表示されている通りcontinuation.finish(throwing: MyError.e1)
です。)
- なお、throwされたCancellationErrorは
-
-
一連の理解を経て私は、Taskのキャンセルを意識した設計がかなり重要なんだなと感じました。
- asyncなメソッドはTaskを作らずにasyncな処理を呼び出せるので、意図せず
構造化されていないTask
を作ってしまうことを回避できる役割があるのだなとも思いました。
- asyncなメソッドはTaskを作らずにasyncな処理を呼び出せるので、意図せず
まとめ
いかがでしたでしょうか。
まだまだ調べきれていないことや説明できていない機能が盛りだくさん(Actorについて一切触れておらず)ですが、この備忘録程度の記事が何かのお役に立てれば幸いです。
参考にさせていただいたドキュメントや記事など
- https://zenn.dev/akkyie/articles/swift-concurrency
- https://www.swiftbysundell.com/articles/the-role-tasks-play-in-swift-concurrency/
- https://www.avanderlee.com/concurrency/tasks/
- https://www.swiftbysundell.com/articles/swift-concurrency-multiple-tasks-in-parallel/
- https://www.swiftbysundell.com/articles/async-sequences-streams-and-combine/
- https://www.donnywals.com/understanding-swifts-asyncsequence/
- https://www.avanderlee.com/swift/asyncthrowingstream-asyncstream/
- https://developer.apple.com/documentation/swift/swift_standard_library/concurrency