参考書籍
検証で利用するUtilクラスとError
struct Util {
/// 現在実行中のスレッドを出力する
/// - Parameter point: 実行ポイント
static func printCurrentThread(_ point: String) {
let threadName: String = Thread.isMainThread ? "メイン" : "ワーカー"
print("-- \(point): \(threadName)スレッド \(Thread.current.description) で実行中 --")
}
/// 実行開始と`sec`[秒]待機後の終了を通知する同期関数
/// - Parameters:
/// - taskName: タスクの名称
/// - sec: 実行待機時間[秒]
static func waitAndPrintSync(taskName: String, _ sec: Double = 1) {
print("\(taskName): start waiting for \(sec)[s]")
Thread.sleep(forTimeInterval: sec)
print("\(taskName): end")
}
/// 実行開始と`sec`[秒]待機後の終了を通知する非同期関数
/// - Parameters:
/// - taskName: タスクの名称
/// - sec: 実行待機時間[秒]
static func waitAndPrintAsync(taskName: String, _ sec: Int = 1) async {
print("\(taskName): start waiting for \(sec)[s]")
try? await Task.sleep(until: .now + .seconds(sec), clock: .continuous)
print("\(taskName): end")
}
/// プログラム実行時間(Program Execution Time; PET)を計測するオブジェクト
struct PETTracker {
/// `action`の実行時間を計測する
/// - parameter action: 実行時間を計測するブロック
static func track(_ action: () async -> Void) async {
let start: Date = Date()
await action()
let end: Date = Date()
let span: TimeInterval = end.timeIntervalSince(start)
print("実行時間: \(span)[s]")
}
}
}
/// ユーザ定義エラー
enum TestError: Error {
case Error
}
直列実行と静的な並列実行
Task
の並列実行を行う手法は以下の2つ。
-
async let
バインディング(下記に示す)を用いる -
TaskGroup
(後述)を用いる
async let
バインディングは並列実行を行うTask
の個数を静的に指定する一方で、TaskGroup
を用いることで動的個数のTask
を実行することができる。
/// async関数を直列・並列に実行する
func compareSerialWithParallel() {
Util.printCurrentThread("①") // -- ①: メインスレッド <_NSMainThread: 0x6000011c0140>{number = 1, name = main} で実行中 --
Task.detached {
Util.printCurrentThread("②") // -- ②: ワーカースレッド <NSThread: 0x6000011e4a80>{number = 6, name = (null)} で実行中 --
// 直列処理
await Util.waitAndPrintAsync(taskName: "Serial-1")
await Util.waitAndPrintAsync(taskName: "Serial-2")
await Util.waitAndPrintAsync(taskName: "Serial-3")
Util.printCurrentThread("③") // -- ③: ワーカースレッド <NSThread: 0x6000011dc700>{number = 4, name = (null)} で実行中 --
// 並列処理
async let first: Void = Util.waitAndPrintAsync(taskName: "Parallel-1")
async let second: Void = Util.waitAndPrintAsync(taskName: "Parallel-2")
async let third: Void = Util.waitAndPrintAsync(taskName: "Parallel-3")
await (first, second, third)
Util.printCurrentThread("④") // -- ④: ワーカースレッド <NSThread: 0x6000011c2380>{number = 5, name = (null)} で実行中 --
}
Util.printCurrentThread("⑤") // -- ⑤: メインスレッド <_NSMainThread: 0x6000011c0140>{number = 1, name = main} で実行中 --
}
-- ①: メインスレッド <_NSMainThread: 0x6000011c0140>{number = 1, name = main} で実行中 --
-- ②: ワーカースレッド <NSThread: 0x6000011e4a80>{number = 6, name = (null)} で実行中 --
Serial-1: start waiting for 1[s]
-- ⑤: メインスレッド <_NSMainThread: 0x6000011c0140>{number = 1, name = main} で実行中 --
Serial-1: end
Serial-2: start waiting for 1[s]
Serial-2: end
Serial-3: start waiting for 1[s]
Serial-3: end
-- ③: ワーカースレッド <NSThread: 0x6000011dc700>{number = 4, name = (null)} で実行中 --
Parallel-1: start waiting for 1[s]
Parallel-2: start waiting for 1[s]
Parallel-3: start waiting for 1[s]
Parallel-3: end
Parallel-1: end
Parallel-2: end
-- ④: ワーカースレッド <NSThread: 0x6000011c2380>{number = 5, name = (null)} で実行中 --
データ競合を保証するSendable
参考: Sendable and @Sendable closures
Sendable
プロトコルはデータ競合が発生しないことを示すためのプロトコルである。
Sendable
プロトコルに準拠させる場合、データ競合が発生しないデータとなるよう以下のいずれかの条件を満たしている必要がある。
- 全てのメンバ・連想値が
Sendable
プロトコルに準拠した値型データ - サブクラス含め
let
で定義された不変メンバしか持たない不変クラス -
NSLock
やDispatchQueue
・DispatchSemaphore
を用いたロック、Swift Atomics
を用いたアトミックを採用した内部同期クラス -
ディープコピー
を行うディープコピークラス
Actor
外のTask
で呼ばれる、Concurrently(並行的) に動作するローカル関数には、データ競合を避けるため@Sendable
アノテーションを付与する必要がある。
Swift 5.7では警告で済むものの、Swift 6以降はエラーになる。
Concurrently-executed local function 'newAsync(taskName:)' must be marked as '@Sendable'; this is an error in Swift 6
@escaping
な同期関数 → asyncな非同期関数 の変換
Swift Concurrency
に未対応の非同期メソッドをConcurrency
に組み込む場合、その非同期メソッドがエラーをthrowするかどうかに応じて以下の2つのメソッドを使い分ける。
withCheckedContinuation(function:_:)
withCheckedThrowingContinuation(function:_:)
エラーが発生しない同期関数の場合
/// `completionHandler`を用いた従来の同期関数を`async`を用いた非同期関数で実行する(エラー送出なし)
func execCompletionHandlerInConcurrentContext() {
/// `completionHandler`を用いた従来の同期関数
/// - Parameters:
/// - taskName: タスクの名称
/// - completionHandler: 非同期で実行される無名関数
@Sendable func oldAsync(taskName:String, completionHandler: @escaping () -> Void) {
Util.waitAndPrintSync(taskName: taskName)
completionHandler()
}
/// `async`を用いた新たな非同期関数
/// - Parameter taskName: タスクの名称
@Sendable func newAsync(taskName: String) async {
return await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
oldAsync(taskName: taskName) {
continuation.resume()
}
}
}
Task.detached {
await newAsync(taskName: "Task")
}
}
execCompletionHandlerInConcurrentContext()
Task: start waiting for 1.0[s]
Task: end
エラーが発生する同期関数の場合
/// `completionHandler`を用いた従来の同期関数を`async`を用いた非同期関数で実行する(エラー送出あり)
func execCompletionHandlerThrowingErrorInConcurrentContext() {
/// `completionHandler`を用いた従来の同期関数
/// - Parameters:
/// - taskName: タスクの名称
/// - completionHandler: 非同期で実行される無名関数
@Sendable func oldAsync(taskName: String, completionHandler: @escaping (Result<Void, any Error>) -> Void) {
Util.waitAndPrintSync(taskName: taskName)
completionHandler(.failure(NSError(domain: "TestError", code: -1)))
}
/// `async`を用いた新たな非同期関数
/// - Parameter taskName: タスクの名称
/// - Throws: `NSError`
@Sendable func newAsync(taskName: String) async throws {
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Error>) in
oldAsync(taskName: taskName) { (result: Result<Void, any Error>) in
continuation.resume(with: result)
}
}
}
Task.detached {
do {
try await newAsync(taskName: "Task")
}
catch {
print(error.localizedDescription)
}
}
}
execCompletionHandlerThrowingErrorInConcurrentContext()
Task: start waiting for 1.0[s]
Task: end
The operation couldn’t be completed. (TestError error -1.)
データ競合を防ぐが競合状態は防げないActor
参考: データ競合(data race)と競合状態(race condition)を混同しない
データ競合(Data Race)
… 複数スレッド上で少なくともいずれかは書き込みである、同一変数に対する同時アクセスが発生することで、その出力結果(または動作)が予期できない事象
UsernameCreator
をactorではなくclassにした場合、実行結果は以下のように出力される。
↑ Task AによるRead中に、Task BでWriteが行われた?
Task A: Opt Redvuv
(処理が中断され、以下のwarningが出力される)
Playground execution terminated because the process stopped unexpectedly.
競合状態(Race Condition)
… 複数スレッド上で操作が非同期的(ただし同時ではない)に行われることで、実装の想定外の結果になる事象
以下の例では、2つのTaskで
creator.username(userId: 1)
と入力しているため、Task AとTask Bは同じ出力結果になるはずにもかかわらず、実際には異なるusernameが出力されている。これは、以下の順序で操作が行われたことが原因である。
① Task Aで
username(userId:)
が実行開始
② Task Aでキャッシュが存在しないと判定される
③ Task Aで非同期的にユーザ名を生成するため、username(userId:)
がawait状態になる(ここでTask Bでのusername(userId:)
が実行可能になる)
④ Task Bでusername(userId:)
が実行開始
⑤ Task Bでキャッシュが存在しないと判定される ← この実装が競合状態の原因
⑥ Task Bで非同期的にユーザ名を生成するため、username(userId:)
がawait状態になる(ここでTask Aでのusername(userId:)
が再開可能になる)
⑦ Task Bで生成されたユーザ名がキャッシュに保管・出力される(ここでTask Bが終了し、Task Aが再開可能になる)
⑧ Task Aで生成されたユーザ名がキャッシュに保管・出力される(ここでTask Aが終了)
Task A: Og Ejmoada
Task B: Ra Mbdbhdg
/// **データ競合**を防ぐが**競合状態**にはなりうるActor
actor UsernameCreator {
/// キャッシュ
private var cache: [Int : String] = [Int : String]()
/// `userId`に紐づくユーザ名を取得する
/// - Parameter userId: ユーザID
/// - Returns: ユーザIDに紐づくユーザ名
func username(userId: Int) async -> String {
// キャッシュに保管されたusernameがあればそれを返却
if let cached: String = cache[userId] {
return cached
}
// キャッシュになければ半角スペース含めて10字のユーザ名を生成
let randomizedUsername: String = await UsernameCreator.random(length: 10)
// キャッシュに保管しusernameを返却
cache[userId] = randomizedUsername
return cache[userId]!
}
/// 指定文字数のランダムなユーザ名を生成する
/// - Parameter length: ユーザ名のスペースを含む文字数
/// - Returns: ランダムに生成されたユーザ名
static func random(length: Int) async -> String {
try? await Task.sleep(until: .now + .seconds(2), clock: .continuous)
let uppercase: String = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
let lowercase: String = "abcdefghijklmnopqrstuvwxyz"
let spaceIndex: Int = length <= 1 ? -1 : Int.random(in: 2 ..< length)
var username: String = ""
for i in 1 ... length {
switch i {
case 1, spaceIndex + 1:
username += String(uppercase.randomElement()!)
case spaceIndex:
username += " "
default:
username += String(lowercase.randomElement()!)
}
}
return username
}
}
/// 競合状態を発生させる
func causeRaceCondition() {
let creator: UsernameCreator = UsernameCreator()
Task.detached {
print("Task A: \(await creator.username(userId: 1))")
}
Task.detached {
print("Task B: \(await creator.username(userId: 1))")
}
}
causeRaceCondition()
Task A: Zsiw Xtksm
Task B: Vqp Hsdgvo
なぜactorは競合状態を防げないのか
参考: Swift Concurrency まとめ(正式版対応済) - アクターの再入可能性(reentrancy)
actor
で定義されたプロパティ・メソッドに対して、複数スレッド上での同時アクセス・実行ができないため、一見すると競合状態を防げているように思える。
actor RaceConditionCauser {
var num: Int = 0
func updateIfNumEqualsZero() async {...}
}
しかし、actorで定義されたメソッドの「同時実行ができない」というのは、actorメソッド内の一行の操作を同時に行えるのが一つのスレッドのみということであり、実装によっては競合状態が発生する可能性がある。
/// `num`が0であれば0以外の整数値、0でなければその値を返却する
func updateIfNumEqualsZero() async -> Int {
guard num == 0 else { return num } // ① num == 0かどうかを評価し、0でない場合は早期Return
num = await generateIntExceptForZero() // ② 0以外の整数値を非同期で生成し、numに代入
return num // ③ 生成した0以外の整数値であるnumを返却
}
actorで定義されたメソッドはawait
状態になるまで他スレッドでの実行を防ぐものの、await
によって待機状態になると処理の途中であっても他スレッドから実行することができる(= 再入可能(reentrant)
である)ため、操作順序によっては意図しない動作(競合状態)を引き起こす可能性がある。
ワーカースレッド(Task) | Taskの内容 |
---|---|
A |
updateIfNumEqualsZero() が実行開始、他スレッドによる実行を禁止 |
A | ①が実行 ※ num == 0
|
A | ②のgenerateIntExceptForZero() が実行開始 |
A | 他スレッドによるupdateIfNumEqualsZero() の実行を許可(await状態) |
B |
updateIfNumEqualsZero() が実行開始、他スレッドによる実行を禁止 |
B | ①が実行 ※ num == 0
|
B | ②のgenerateIntExceptForZero() が実行開始 |
B | 他スレッドによるupdateIfNumEqualsZero() の実行を許可(await状態) |
B |
updateIfNumEqualsZero() を再開、他スレッドによる実行を禁止 |
B | 生成した乱数値をnum に代入しreturn
|
B | 他スレッドによるupdateIfNumEqualsZero() の実行を許可(実行終了) |
A |
updateIfNumEqualsZero() を再開、他スレッドによる実行を禁止 |
A | 生成した乱数値をnum に代入しreturn
|
A | 他スレッドによるupdateIfNumEqualsZero() の実行を許可(実行終了) |
競合状態を防ぐTask
競合状態を防ぐには、以下の手法が挙げられる。
① 直列実行する
② await
後に再度キャッシュをチェックする (await
によって状態が変化している可能性があるため)
③ Taskの実行結果ではなく、Task
をキャッシュしtask.value
を`awaitで取得する
ここでは、③の手法で実装する。
/// 競合状態を防ぐActor
actor UpdatedUsernameCreator {
private enum CacheEntry {
case inProgress(Task<String, Never>)
case ready(String)
}
/// キャッシュ
private var cache: [Int : CacheEntry] = [Int : CacheEntry]()
/// `userId`に紐づくユーザ名を取得する
/// - Parameter userId: ユーザID
/// - Returns: ユーザIDに紐づくユーザ名
func username(userId: Int) async -> String {
// CacheEntryのキャッシュがあればそこからusernameを取得
if let cached: CacheEntry = cache[userId] {
switch cached {
case .ready(let username):
return username
case .inProgress(let task):
return await task.value
}
}
// CacheEntryのキャッシュがなければキャッシュにTaskを追加
let task: Task<String, Never> = Task {
await UsernameCreator.random(length: 10)
}
// inProgress状態でキャッシュに保管
cache[userId] = .inProgress(task)
// Suspension Pointを設定することで他スレッドでのusername(userId:)を実行可能にする
let username: String = await task.value
cache[userId] = .ready(username)
return username
}
}
/// 競合状態が発生しない
func neverCauseRaceCondition() {
let creator: UpdatedUsernameCreator = UpdatedUsernameCreator()
Task.detached {
print("Task A: \(await creator.username(userId: 1))")
}
Task.detached {
print("Task B: \(await creator.username(userId: 1))")
}
}
neverCauseRaceCondition()
Task B: Xd Ulusmos
Task A: Xd Ulusmos
非同期で要素をイテレートするAsyncSequence
メインスレッド
でループ処理を実行 かつ イテレート処理を非同期的に行うシーケンスを作成する場合、AsyncSequence
に準拠させる。
/// イテレート処理を非同期で行う`AsyncSequence`
struct AlphabetAsyncSequence: AsyncSequence {
/// `AsyncIteratorProtocol`に準拠する型エイリアス
/// - note: ※ `AsyncIterator`という名前で宣言する場合は不要
typealias AsyncIterator = AlphabetAsyncIterator
/// シーケンスの要素の型エイリアス
/// - note: `AsyncSequence.Element == AsyncSequence.AsyncIterator.Element`
typealias Element = Character
/// 全体集合となる文字列
let input: String
init(input: String) {
self.input = input
}
/// `AsyncSequence`の次の要素を**非同期**で特定する`AsyncIterator`
struct AlphabetAsyncIterator: AsyncIteratorProtocol {
/// シーケンスの要素の型エイリアス
/// - note: `AsyncSequence.Element == AsyncSequence.AsyncIterator.Element`
typealias Element = Character
/// 全体集合となる文字列
let input: String
static let alphabets: String = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
/// 現在の要素のインデックス番号
var index: Int = 0
init(input: String) {
self.input = input
}
/// 次のアルファベットである要素を**非同期**で特定する
/// - Returns: 次のアルファベット要素
mutating func next() async -> Character? {
var nextCharacter: Character?
while index < input.count {
let sliced: Character = input[String.Index(utf16Offset: index, in: input)]
index += 1
if AlphabetAsyncIterator.alphabets.contains(sliced) {
nextCharacter = sliced
Util.printCurrentThread("AlphabetAsyncIterator#next()") // -- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600002f9cbc0>{number = 6, name = (null)} で実行中 --
break
}
}
return nextCharacter
}
}
/// 次の要素を**非同期**で特定する`AsyncIterator`を生成する
/// - Returns: `AlphabetAsyncIterator`
func makeAsyncIterator() -> AlphabetAsyncIterator {
return AlphabetAsyncIterator(input: input)
}
}
/// イテレート処理を非同期で実行する
func iterateAsynchronously() {
Task { @MainActor in
for await alphabet in AlphabetAsyncSequence(input: "あA1いb2うC3") {
print(alphabet)
}
}
}
iterateAsynchronously()
-- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600002f9cbc0>{number = 6, name = (null)} で実行中 --
A
-- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600002f9f4c0>{number = 3, name = (null)} で実行中 --
b
-- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600002f9f4c0>{number = 3, name = (null)} で実行中 --
C
通常のfor文と何が違うのか
ループで次の要素を特定(=イテレート
)するnext()
が、AsyncIterator
では非同期的に実行され、Iterator
では同期的に実行される。
// AsyncIteratorProtocol
mutating func next() async throws -> Self.Element?
// IteratorProtocol
mutating func next() -> Self.Element?
これは、メインスレッド
で実行される場合にのみ違いが生じる。
/// イテレート処理を同期で行う`Sequence`
struct AlphabetSequence: Sequence {
typealias Iterator = AlphabetIterator
typealias Element = Character
let input: String
init(input: String) {
self.input = input
}
struct AlphabetIterator: IteratorProtocol {
typealias Element = Character
let input: String
static let alphabets: String = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
var index: Int = 0
init(input: String) {
self.input = input
}
/// 次のアルファベットである要素を**同期**で特定する
/// - Returns: 次のアルファベット要素
mutating func next() -> Character? {
var nextCharacter: Character?
while index < input.count {
let sliced: Character = input[String.Index(utf16Offset: index, in: input)]
index += 1
if AlphabetIterator.alphabets.contains(sliced) {
nextCharacter = sliced
Util.printCurrentThread("AlphabetIterator#next()") // -- AlphabetIterator#next(): メインスレッド <_NSMainThread: 0x6000021d8340>{number = 1, name = main} で実行中 --
break
}
}
return nextCharacter
}
}
/// 次の要素を**同期**で特定する`AsyncIterator`を生成する
/// - Returns: `AlphabetIterator`
func makeIterator() -> AlphabetIterator {
return AlphabetIterator(input: input)
}
}
/// イテレート処理を同期で実行する
func iterateSynchronously() {
Task { @MainActor in
for alphabet in AlphabetSequence(input: "あA1いb2うC3") {
print(alphabet)
}
}
}
iterateSynchronously()
-- AlphabetIterator#next(): メインスレッド <_NSMainThread: 0x6000021d8340>{number = 1, name = main} で実行中 --
A
-- AlphabetIterator#next(): メインスレッド <_NSMainThread: 0x6000021d8340>{number = 1, name = main} で実行中 --
b
-- AlphabetIterator#next(): メインスレッド <_NSMainThread: 0x6000021d8340>{number = 1, name = main} で実行中 --
C
// AsyncSequence
-- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600002f9cbc0>{number = 6, name = (null)} で実行中 --
// Sequence
-- AlphabetIterator#next(): メインスレッド <_NSMainThread: 0x6000021d8340>{number = 1, name = main} で実行中 --
ここで、ループ処理をワーカースレッド
で行うとSequence
とAsyncSequence
の違いはなくなる。
func iterateAsynchronously() {
Task.detached { ... } // Task { @MainActor in ... } から変更
}
func iterateSynchronously() {
Task.detached { ... } // Task { @MainActor in ... } から変更
}
// AsyncSequence
-- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600003a28340>{number = 7, name = (null)} で実行中 --
// Sequence
-- AlphabetIterator#next(): ワーカースレッド <NSThread: 0x6000032c4680>{number = 4, name = (null)} で実行中 --
AsyncSequenceを生成するAsyncStream
参考: SwiftConcurrencyについて調べたので備忘録
エラーが発生しないAsyncSequenceを生成するAsyncStream
/// `AsyncStream`による非同期イテレート処理を実行する
func iterateWithAsyncStream() {
/// `AsyncStream`を生成する
/// - Returns: `AsyncStream`
@Sendable func createAsyncStream() -> AsyncStream<Int> {
return AsyncStream { (continuation: AsyncStream<Int>.Continuation) in
Task.detached {
for i in 0 ..< 10 {
try? await Task.sleep(until: .now + .nanoseconds(100), clock: .continuous)
// 要素を追加する
continuation.yield(i)
}
// 要素の追加を終了する
continuation.finish()
}
// 要素の追加終了時に呼び出されるコールバック処理
continuation.onTermination = { (termination: AsyncStream<Int>.Continuation.Termination) in
switch termination {
case .cancelled:
print("AsyncStream.Continuation is cancelled.")
case .finished:
print("AsyncStream.Continuation is finished.")
@unknown default:
fatalError()
}
}
}
}
Task.detached {
var result: [Int] = []
for await i in createAsyncStream() {
result.append(i)
}
print("AsyncStream is done. - Result: \(result)")
}
}
iterateWithAsyncStream()
AsyncStream.Continuation is finished.
AsyncStream is done. - Result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
エラーが発生するAsyncSequenceを生成するAsyncThrowingStream
/// `AsyncThrowingStream`による非同期イテレート処理を実行する
func iterateWithAsyncThrowingStream() {
/// `AsyncThrowingStream`を生成する
/// - Returns: `AsyncThrowingStream`
@Sendable func createAsyncThrowingStream() -> AsyncThrowingStream<Int, any Error> {
return AsyncThrowingStream { (continuation: AsyncThrowingStream<Int, any Error>.Continuation) in
Task.detached {
for i in 0 ..< 10 {
// 要素を追加する
continuation.yield(i)
if i >= 5 {
// 要素の追加を終了し、Errorをスローする
continuation.finish(throwing: NSError(domain: "TestError", code: -1))
break
}
}
}
// 要素の追加終了時に呼び出されるコールバック処理
continuation.onTermination = { (termination: AsyncThrowingStream<Int, any Error>.Continuation.Termination) in
switch termination {
case .cancelled:
print("AsyncThrowingStream.Continuation is cancelled.")
case .finished:
print("AsyncThrowingStream.Continuation is finished.")
@unknown default:
fatalError()
}
}
}
}
Task.detached {
var result: [Int] = []
do {
for try await i in createAsyncThrowingStream() {
result.append(i)
}
}
catch {
print(error.localizedDescription)
}
print("AsyncThrowingStream is done. - Result: \(result)")
}
}
iterateWithAsyncThrowingStream()
AsyncThrowingStream.Continuation is finished.
The operation couldn’t be completed. (TestError error -1.)
AsyncThrowingStream is done. - Result: [0, 1, 2, 3, 4, 5]
Taskを構造化するTaskGroup
Task
をTaskGroup
によって構造化することで、動的個数のタスクの並列実行やタスク間の優先度・キャンセル制御を簡潔に行うことができる。
エラーが発生しないタスクを構造化するTaskGroup
/// `TaskGroup`によってタスクを並列実行する
func executeParallelTasksInTaskGroup() {
Task.detached {
await Util.PETTracker.track {
await withTaskGroup(of: Void.self) { (group: inout TaskGroup<Void>) in
let tasks: [String] = ["Task A", "Task B", "Task C"]
for task in tasks {
group.addTask {
return await Util.waitAndPrintAsync(taskName: task)
}
}
await group.waitForAll()
}
}
}
}
executeParallelTasksInTaskGroup()
Task A: start waiting for 1[s]
Task B: start waiting for 1[s]
Task C: start waiting for 1[s]
Task B: end
Task A: end
Task C: end
実行時間: 1.0833109617233276[s]
エラーによってタスクをキャンセルするThrowingTaskGroup
ThrowingTaskGroup
で形成した子タスクでError
がスローされた場合、エラーをスローした子タスクを参照したタイミングで同じ階層の他タスクはキャンセルされ、親タスクにエラーが伝播する。
func executeParallelTasksThrowingErrorInThrowingTaskGroup() {
Task.detached {
await Util.PETTracker.track {
do {
try await withThrowingTaskGroup(of: Void.self) { (group: inout ThrowingTaskGroup<Void, any Error>) in
let tasks: [String] = ["Task A", "Task B", "Task C"]
for task in tasks {
group.addTask {
if task == "Task C" {
throw NSError(domain: "TestError", code: -1)
}
else {
return await Util.waitAndPrintAsync(taskName: task)
}
}
}
try await group.waitForAll()
}
}
catch {
print(error.localizedDescription)
}
}
}
}
executeParallelTasksThrowingErrorInThrowingTaskGroup()
Task A: start waiting for 1[s]
Task B: start waiting for 1[s]
Task A: end
Task B: end
The operation couldn’t be completed. (TestError error -1.)
実行時間: 0.02766096591949463[s]
タスクがキャンセル状態かどうかを判別する
参考: 同時並行処理(Concurrency) - タスクのキャンセル(Task Cancellation)
タスクをキャンセル状態にするには、以下の3つのメソッドのいずれかを用いる。
Task#cancel()
TaskGroup#cancelAll()
ThrowingTaskGroup#cancelAll()
タスクが途中でキャンセルされた場合、キャンセルされたTask
の返り値は以下の3つのいずれかになる。
Error
-
nil
または空のコレクション - キャンセル時点の値
タスクがキャンセル状態であるかどうかを判別したい場合、以下の2つのメソッドのいずれかを用いる。
Task.isCancelled()
Task.checkCancellation()
/// タスクのキャンセル状態を確認する
func checkIfTaskIsCancelled() {
// 外部タスク: 内部タスクのキャンセル状態に応じて文字列を出力し、内部タスクがキャンセルされていなければその値を返却する
let outer: Task<[Int], Never> = Task.detached {
// 内部タスク: 1〜10000の整数値を1つずつ追加した配列を返却する
let inner: Task<[Int], any Error> = Task.detached {
var result: [Int] = []
for i in 1...10000 {
try Task.checkCancellation()
result.append(i)
}
return result
}
// 内部タスクをキャンセル状態にする
inner.cancel()
if inner.isCancelled {
print("Inner Task is cancelled.")
}
if Task.isCancelled {
// 構造化されていないTaskはキャンセル状態が伝播しない
print("Outer Task is cancelled.")
}
do {
return try await inner.value
}
catch {
print(error.localizedDescription)
return [-1]
}
}
Task {
print("Outer Task returns \(await outer.value).")
}
}
checkIfTaskIsCancelled()
Inner Task is cancelled.
The operation couldn’t be completed. (Swift.CancellationError error 1.)
Outer Task returns [-1].
Structured ConcurrencyとUnstructured Concurrency
参考: 同時並行処理(Concurrency) - 独立した並行処理(Unstructured Concurrency)
参考: Introduction to Unstructured Concurrency in Swift
Structured Concurrency(構造化された並行性)
とUnstructured Concurrency(独立した並行性)
の違いをまとめると、以下のようになる。
Concurrency | 特徴 | 生成手法 |
---|---|---|
Structured | タスクツリーが形成される |
async let バインディングTaskGroup
|
Unstructured | タスクツリーが形成されない |
Task.init(priority:operation:) Task.detached(priority:operation:)
|
Structured Concurrency
はTask間で親子関係を表すタスクツリーが形成され、キャンセルの伝播・コンパイルエラーの検知がある程度自動で行われるが、非同期関数内でしか扱うことができない。
一方でUnstructured Concurrency
は同期関数内でも扱える一方で、タスクツリーによる自動キャンセル処理機構がないため、キャンセル操作を明示的に実行する必要がある。