LoginSignup
10
7

More than 1 year has passed since last update.

Swift Concurrencyを試す

Last updated at Posted at 2022-11-05

参考書籍

一冊でマスター!Swift Concurrency入門

検証で利用するUtilクラスとError

UtilクラスとError
struct Util {
  /// 現在実行中のスレッドを出力する
  /// - Parameter point: 実行ポイント
  static func printCurrentThread(_ point: String) {
    let threadName: String = Thread.isMainThread ? "メイン" : "ワーカー"
    print("-- \(point): \(threadName)スレッド \(Thread.current.description) で実行中 --")
  }

  /// 実行開始と`sec`[秒]待機後の終了を通知する同期関数
  /// - Parameters:
  ///   - taskName: タスクの名称
  ///   - sec: 実行待機時間[秒]
  static func waitAndPrintSync(taskName: String, _ sec: Double = 1) {
    print("\(taskName): start waiting for \(sec)[s]")
    Thread.sleep(forTimeInterval: sec)
    print("\(taskName): end")
  }
  
  /// 実行開始と`sec`[秒]待機後の終了を通知する非同期関数
  /// - Parameters:
  ///   - taskName: タスクの名称
  ///   - sec: 実行待機時間[秒]
  static func waitAndPrintAsync(taskName: String, _ sec: Int = 1) async {
    print("\(taskName): start waiting for \(sec)[s]")
    try? await Task.sleep(until: .now + .seconds(sec), clock: .continuous)
    print("\(taskName): end")
  }
  
  /// プログラム実行時間(Program Execution Time; PET)を計測するオブジェクト
  struct PETTracker {
    /// `action`の実行時間を計測する
    /// - parameter action: 実行時間を計測するブロック
    static func track(_ action: () async -> Void) async {
      let start: Date = Date()
      await action()
      let end: Date = Date()
      
      let span: TimeInterval = end.timeIntervalSince(start)
      print("実行時間: \(span)[s]")
    }
  }
}

/// ユーザ定義エラー
enum TestError: Error {
  case Error
}

直列実行と静的な並列実行

Taskの並列実行を行う手法は以下の2つ。

  • async letバインディング(下記に示す)を用いる
  • TaskGroup(後述)を用いる

async letバインディングは並列実行を行うTaskの個数を静的に指定する一方で、TaskGroupを用いることで動的個数Taskを実行することができる。

/// async関数を直列・並列に実行する
func compareSerialWithParallel() {
  Util.printCurrentThread("①")  // -- ①: メインスレッド <_NSMainThread: 0x6000011c0140>{number = 1, name = main} で実行中 --
  
  Task.detached {
    Util.printCurrentThread("②")  // -- ②: ワーカースレッド <NSThread: 0x6000011e4a80>{number = 6, name = (null)} で実行中 --
    
    // 直列処理
    await Util.waitAndPrintAsync(taskName: "Serial-1")
    await Util.waitAndPrintAsync(taskName: "Serial-2")
    await Util.waitAndPrintAsync(taskName: "Serial-3")
    
    Util.printCurrentThread("③")  // -- ③: ワーカースレッド <NSThread: 0x6000011dc700>{number = 4, name = (null)} で実行中 --
    
    // 並列処理
    async let first: Void = Util.waitAndPrintAsync(taskName: "Parallel-1")
    async let second: Void = Util.waitAndPrintAsync(taskName: "Parallel-2")
    async let third: Void = Util.waitAndPrintAsync(taskName: "Parallel-3")
    await (first, second, third)
    
    Util.printCurrentThread("④")  // -- ④: ワーカースレッド <NSThread: 0x6000011c2380>{number = 5, name = (null)} で実行中 --
  }
  
  Util.printCurrentThread("⑤")  // -- ⑤: メインスレッド <_NSMainThread: 0x6000011c0140>{number = 1, name = main} で実行中 --
}
実行結果
-- ①: メインスレッド <_NSMainThread: 0x6000011c0140>{number = 1, name = main} で実行中 --
-- ②: ワーカースレッド <NSThread: 0x6000011e4a80>{number = 6, name = (null)} で実行中 --
Serial-1: start waiting for 1[s]
-- ⑤: メインスレッド <_NSMainThread: 0x6000011c0140>{number = 1, name = main} で実行中 --
Serial-1: end
Serial-2: start waiting for 1[s]
Serial-2: end
Serial-3: start waiting for 1[s]
Serial-3: end
-- ③: ワーカースレッド <NSThread: 0x6000011dc700>{number = 4, name = (null)} で実行中 --
Parallel-1: start waiting for 1[s]
Parallel-2: start waiting for 1[s]
Parallel-3: start waiting for 1[s]
Parallel-3: end
Parallel-1: end
Parallel-2: end
-- ④: ワーカースレッド <NSThread: 0x6000011c2380>{number = 5, name = (null)} で実行中 --

データ競合を保証するSendable

参考: Sendable and @Sendable closures

Sendableプロトコルはデータ競合が発生しないことを示すためのプロトコルである。
Sendableプロトコルに準拠させる場合、データ競合が発生しないデータとなるよう以下のいずれかの条件を満たしている必要がある。

  • 全てのメンバ・連想値がSendableプロトコルに準拠した値型データ
  • サブクラス含めletで定義された不変メンバしか持たない不変クラス
  • NSLockDispatchQueueDispatchSemaphoreを用いたロック、Swift Atomicsを用いたアトミックを採用した内部同期クラス
  • ディープコピーを行うディープコピークラス

Actor外のTaskで呼ばれる、Concurrently(並行的) に動作するローカル関数には、データ競合を避けるため@Sendableアノテーションを付与する必要がある。
Swift 5.7では警告で済むものの、Swift 6以降はエラーになる。

@Sendable属性を付与しない場合に出力される警告
Concurrently-executed local function 'newAsync(taskName:)' must be marked as '@Sendable'; this is an error in Swift 6

@escapingな同期関数 → asyncな非同期関数 の変換

Swift Concurrencyに未対応の非同期メソッドをConcurrencyに組み込む場合、その非同期メソッドがエラーをthrowするかどうかに応じて以下の2つのメソッドを使い分ける。

  • withCheckedContinuation(function:_:)
  • withCheckedThrowingContinuation(function:_:)

エラーが発生しない同期関数の場合

/// `completionHandler`を用いた従来の同期関数を`async`を用いた非同期関数で実行する(エラー送出なし)
func execCompletionHandlerInConcurrentContext() {
  /// `completionHandler`を用いた従来の同期関数
  /// - Parameters:
  ///   - taskName: タスクの名称
  ///   - completionHandler: 非同期で実行される無名関数
  @Sendable func oldAsync(taskName:String, completionHandler: @escaping () -> Void) {
    Util.waitAndPrintSync(taskName: taskName)
    completionHandler()
  }
  
  
  /// `async`を用いた新たな非同期関数
  /// - Parameter taskName: タスクの名称
  @Sendable func newAsync(taskName: String) async {
    return await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
      oldAsync(taskName: taskName) {
        continuation.resume()
      }
    }
  }
  
  Task.detached {
    await newAsync(taskName: "Task")
  }
}

execCompletionHandlerInConcurrentContext()
実行結果
Task: start waiting for 1.0[s]
Task: end

エラーが発生する同期関数の場合

/// `completionHandler`を用いた従来の同期関数を`async`を用いた非同期関数で実行する(エラー送出あり)
func execCompletionHandlerThrowingErrorInConcurrentContext() {
  /// `completionHandler`を用いた従来の同期関数
  /// - Parameters:
  ///   - taskName: タスクの名称
  ///   - completionHandler: 非同期で実行される無名関数
  @Sendable func oldAsync(taskName: String, completionHandler: @escaping (Result<Void, any Error>) -> Void) {
    Util.waitAndPrintSync(taskName: taskName)
    completionHandler(.failure(NSError(domain: "TestError", code: -1)))
  }
  
  /// `async`を用いた新たな非同期関数
  /// - Parameter taskName: タスクの名称
  /// - Throws: `NSError`
  @Sendable func newAsync(taskName: String) async throws {
    return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Error>) in
      oldAsync(taskName: taskName) { (result: Result<Void, any Error>) in
        continuation.resume(with: result)
      }
    }
  }
  
  Task.detached {
    do {
      try await newAsync(taskName: "Task")
    }
    catch {
      print(error.localizedDescription)
    }
  }
}

execCompletionHandlerThrowingErrorInConcurrentContext()

実行結果
Task: start waiting for 1.0[s]
Task: end
The operation couldn’t be completed. (TestError error -1.)

データ競合を防ぐが競合状態は防げないActor

参考: データ競合(data race)と競合状態(race condition)を混同しない

データ競合(Data Race) … 複数スレッド上で少なくともいずれかは書き込みである、同一変数に対する同時アクセスが発生することで、その出力結果(または動作)が予期できない事象

UsernameCreatorをactorではなくclassにした場合、実行結果は以下のように出力される。
↑ Task AによるRead中に、Task BでWriteが行われた?

実行結果(データ競合が発生)
Task A: Opt Redvuv

(処理が中断され、以下のwarningが出力される)

Playground execution terminated because the process stopped unexpectedly.

競合状態(Race Condition) … 複数スレッド上で操作が非同期的(ただし同時ではない)に行われることで、実装の想定外の結果になる事象

以下の例では、2つのTaskでcreator.username(userId: 1)と入力しているため、Task AとTask Bは同じ出力結果になるはずにもかかわらず、実際には異なるusernameが出力されている。

これは、以下の順序で操作が行われたことが原因である。

① Task Aでusername(userId:)が実行開始
② Task Aでキャッシュが存在しないと判定される
③ Task Aで非同期的にユーザ名を生成するため、username(userId:)がawait状態になる(ここでTask Bでのusername(userId:)が実行可能になる)
④ Task Bでusername(userId:)が実行開始
Task Bでキャッシュが存在しないと判定される ← この実装が競合状態の原因
⑥ Task Bで非同期的にユーザ名を生成するため、username(userId:)がawait状態になる(ここでTask Aでのusername(userId:)が再開可能になる)
⑦ Task Bで生成されたユーザ名がキャッシュに保管・出力される(ここでTask Bが終了し、Task Aが再開可能になる)
⑧ Task Aで生成されたユーザ名がキャッシュに保管・出力される(ここでTask Aが終了)

実行結果(出力順不同、抜粋)
Task A: Og Ejmoada
Task B: Ra Mbdbhdg
Actor
/// **データ競合**を防ぐが**競合状態**にはなりうるActor
actor UsernameCreator {
  /// キャッシュ
  private var cache: [Int : String] = [Int : String]()
  
  /// `userId`に紐づくユーザ名を取得する
  /// - Parameter userId: ユーザID
  /// - Returns: ユーザIDに紐づくユーザ名
  func username(userId: Int) async -> String {
    // キャッシュに保管されたusernameがあればそれを返却
    if let cached: String = cache[userId] {
      return cached
    }
    
    // キャッシュになければ半角スペース含めて10字のユーザ名を生成
    let randomizedUsername: String = await UsernameCreator.random(length: 10)
    
    // キャッシュに保管しusernameを返却
    cache[userId] = randomizedUsername
    return cache[userId]!
  }
  
  /// 指定文字数のランダムなユーザ名を生成する
  /// - Parameter length: ユーザ名のスペースを含む文字数
  /// - Returns: ランダムに生成されたユーザ名
  static func random(length: Int) async -> String {
    try? await Task.sleep(until: .now + .seconds(2), clock: .continuous)
    
    let uppercase: String = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    let lowercase: String = "abcdefghijklmnopqrstuvwxyz"
    let spaceIndex: Int = length <= 1 ? -1 : Int.random(in: 2 ..< length)
    var username: String = ""
    
    for i in 1 ... length {
      switch i {
        case 1, spaceIndex + 1:
          username += String(uppercase.randomElement()!)
        case spaceIndex:
          username += " "
        default:
          username += String(lowercase.randomElement()!)
      }
    }
    
    return username
  }
}

/// 競合状態を発生させる
func causeRaceCondition() {
  let creator: UsernameCreator = UsernameCreator()
  Task.detached {
    print("Task A: \(await creator.username(userId: 1))")
  }
  
  Task.detached {
    print("Task B: \(await creator.username(userId: 1))")
  }
}

causeRaceCondition()
実行結果
Task A: Zsiw Xtksm
Task B: Vqp Hsdgvo

なぜactorは競合状態を防げないのか

参考: Swift Concurrency まとめ(正式版対応済) - アクターの再入可能性(reentrancy)

actorで定義されたプロパティ・メソッドに対して、複数スレッド上での同時アクセス・実行ができないため、一見すると競合状態を防げているように思える。

actor RaceConditionCauser {
  var num: Int = 0
  func updateIfNumEqualsZero() async {...}
}

しかし、actorで定義されたメソッドの「同時実行ができない」というのは、actorメソッド内の一行の操作を同時に行えるのが一つのスレッドのみということであり、実装によっては競合状態が発生する可能性がある。

/// `num`が0であれば0以外の整数値、0でなければその値を返却する
func updateIfNumEqualsZero() async -> Int {
  guard num == 0 else { return num }  // ① num == 0かどうかを評価し、0でない場合は早期Return
  num = await generateIntExceptForZero()  // ② 0以外の整数値を非同期で生成し、numに代入
  return num  // ③ 生成した0以外の整数値であるnumを返却
}

actorで定義されたメソッドはawait状態になるまで他スレッドでの実行を防ぐものの、awaitによって待機状態になると処理の途中であっても他スレッドから実行することができる(= 再入可能(reentrant)である)ため、操作順序によっては意図しない動作(競合状態)を引き起こす可能性がある。

ワーカースレッド(Task) Taskの内容
A updateIfNumEqualsZero()が実行開始、他スレッドによる実行を禁止
A ①が実行 ※ num == 0
A ②のgenerateIntExceptForZero()が実行開始
A 他スレッドによるupdateIfNumEqualsZero()の実行を許可(await状態)
B updateIfNumEqualsZero()が実行開始、他スレッドによる実行を禁止
B ①が実行 ※ num == 0
B ②のgenerateIntExceptForZero()が実行開始
B 他スレッドによるupdateIfNumEqualsZero()の実行を許可(await状態)
B updateIfNumEqualsZero()を再開、他スレッドによる実行を禁止
B 生成した乱数値をnumに代入しreturn
B 他スレッドによるupdateIfNumEqualsZero()の実行を許可(実行終了)
A updateIfNumEqualsZero()を再開、他スレッドによる実行を禁止
A 生成した乱数値をnumに代入しreturn
A 他スレッドによるupdateIfNumEqualsZero()の実行を許可(実行終了)

競合状態を防ぐTask

競合状態を防ぐには、以下の手法が挙げられる。

① 直列実行する
await後に再度キャッシュをチェックする (awaitによって状態が変化している可能性があるため)
③ Taskの実行結果ではなく、Taskをキャッシュしtask.valueを`awaitで取得する

ここでは、③の手法で実装する。

/// 競合状態を防ぐActor
actor UpdatedUsernameCreator {
  private enum CacheEntry {
    case inProgress(Task<String, Never>)
    case ready(String)
  }
  
  /// キャッシュ
  private var cache: [Int : CacheEntry] = [Int : CacheEntry]()
  
  /// `userId`に紐づくユーザ名を取得する
  /// - Parameter userId: ユーザID
  /// - Returns: ユーザIDに紐づくユーザ名
  func username(userId: Int) async -> String {
    // CacheEntryのキャッシュがあればそこからusernameを取得
    if let cached: CacheEntry = cache[userId] {
      switch cached {
        case .ready(let username):
          return username
        case .inProgress(let task):
          return await task.value
      }
    }
    
    // CacheEntryのキャッシュがなければキャッシュにTaskを追加
    let task: Task<String, Never> = Task {
      await UsernameCreator.random(length: 10)
    }
    
    // inProgress状態でキャッシュに保管
    cache[userId] = .inProgress(task)
    // Suspension Pointを設定することで他スレッドでのusername(userId:)を実行可能にする
    let username: String = await task.value
    cache[userId] = .ready(username)
    
    return username
  }
}

/// 競合状態が発生しない
func neverCauseRaceCondition() {
  let creator: UpdatedUsernameCreator = UpdatedUsernameCreator()
  
  Task.detached {
    print("Task A: \(await creator.username(userId: 1))")
  }
  
  Task.detached {
    print("Task B: \(await creator.username(userId: 1))")
  }
}

neverCauseRaceCondition()
実行結果
Task B: Xd Ulusmos
Task A: Xd Ulusmos

非同期で要素をイテレートするAsyncSequence

メインスレッドでループ処理を実行 かつ イテレート処理を非同期的に行うシーケンスを作成する場合、AsyncSequenceに準拠させる。

/// イテレート処理を非同期で行う`AsyncSequence`
struct AlphabetAsyncSequence: AsyncSequence {
  /// `AsyncIteratorProtocol`に準拠する型エイリアス
  /// - note: ※ `AsyncIterator`という名前で宣言する場合は不要
  typealias AsyncIterator = AlphabetAsyncIterator
  
  /// シーケンスの要素の型エイリアス
  /// - note: `AsyncSequence.Element == AsyncSequence.AsyncIterator.Element`
  typealias Element = Character
  
  /// 全体集合となる文字列
  let input: String
  
  init(input: String) {
    self.input = input
  }
  
  /// `AsyncSequence`の次の要素を**非同期**で特定する`AsyncIterator`
  struct AlphabetAsyncIterator: AsyncIteratorProtocol {
    /// シーケンスの要素の型エイリアス
    /// - note: `AsyncSequence.Element == AsyncSequence.AsyncIterator.Element`
    typealias Element = Character
    
    /// 全体集合となる文字列
    let input: String
    
    static let alphabets: String = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
    
    /// 現在の要素のインデックス番号
    var index: Int = 0
    
    init(input: String) {
      self.input = input
    }
    
    /// 次のアルファベットである要素を**非同期**で特定する
    /// - Returns: 次のアルファベット要素
    mutating func next() async -> Character? {
      var nextCharacter: Character?
      while index < input.count {
        let sliced: Character = input[String.Index(utf16Offset: index, in: input)]
        
        index += 1
        
        if AlphabetAsyncIterator.alphabets.contains(sliced) {
          nextCharacter = sliced
          Util.printCurrentThread("AlphabetAsyncIterator#next()")  // -- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600002f9cbc0>{number = 6, name = (null)} で実行中 --
          break
        }
      }
      
      return nextCharacter
    }
  }
  
  /// 次の要素を**非同期**で特定する`AsyncIterator`を生成する
  /// - Returns: `AlphabetAsyncIterator`
  func makeAsyncIterator() -> AlphabetAsyncIterator {
    return AlphabetAsyncIterator(input: input)
  }
}

/// イテレート処理を非同期で実行する
func iterateAsynchronously() {
  Task { @MainActor in
    for await alphabet in AlphabetAsyncSequence(input: "あA1いb2うC3") {
      print(alphabet)
    }
  }
}

iterateAsynchronously()
実行結果
-- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600002f9cbc0>{number = 6, name = (null)} で実行中 --
A
-- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600002f9f4c0>{number = 3, name = (null)} で実行中 --
b
-- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600002f9f4c0>{number = 3, name = (null)} で実行中 --
C

通常のfor文と何が違うのか

ループで次の要素を特定(=イテレート)するnext()が、AsyncIteratorでは非同期的に実行され、Iteratorでは同期的に実行される。

// AsyncIteratorProtocol
mutating func next() async throws -> Self.Element?

// IteratorProtocol
mutating func next() -> Self.Element?

これは、メインスレッドで実行される場合にのみ違いが生じる。

メインスレッドでSequenceによるループを実行
/// イテレート処理を同期で行う`Sequence`
struct AlphabetSequence: Sequence {
  typealias Iterator = AlphabetIterator
  typealias Element = Character
  let input: String
  
  init(input: String) {
    self.input = input
  }
  
  struct AlphabetIterator: IteratorProtocol {
    typealias Element = Character
    
    let input: String
    static let alphabets: String = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
    var index: Int = 0
    
    init(input: String) {
      self.input = input
    }
    
    /// 次のアルファベットである要素を**同期**で特定する
    /// - Returns: 次のアルファベット要素
    mutating func next() -> Character? {
      var nextCharacter: Character?
      while index < input.count {
        let sliced: Character = input[String.Index(utf16Offset: index, in: input)]
        
        index += 1
        
        if AlphabetIterator.alphabets.contains(sliced) {
          nextCharacter = sliced
          Util.printCurrentThread("AlphabetIterator#next()")  // -- AlphabetIterator#next(): メインスレッド <_NSMainThread: 0x6000021d8340>{number = 1, name = main} で実行中 --
          break
        }
      }
      
      return nextCharacter
    }
  }
  
  /// 次の要素を**同期**で特定する`AsyncIterator`を生成する
  /// - Returns: `AlphabetIterator`
  func makeIterator() -> AlphabetIterator {
    return AlphabetIterator(input: input)
  }
}

/// イテレート処理を同期で実行する
func iterateSynchronously() {
  Task { @MainActor in
    for alphabet in AlphabetSequence(input: "あA1いb2うC3") {
      print(alphabet)
    }
  }
}

iterateSynchronously()
実行結果
-- AlphabetIterator#next(): メインスレッド <_NSMainThread: 0x6000021d8340>{number = 1, name = main} で実行中 --
A
-- AlphabetIterator#next(): メインスレッド <_NSMainThread: 0x6000021d8340>{number = 1, name = main} で実行中 --
b
-- AlphabetIterator#next(): メインスレッド <_NSMainThread: 0x6000021d8340>{number = 1, name = main} で実行中 --
C
メインスレッドでループさせたAsyncSequenceとSequenceの比較
// AsyncSequence
-- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600002f9cbc0>{number = 6, name = (null)} で実行中 --

// Sequence
-- AlphabetIterator#next(): メインスレッド <_NSMainThread: 0x6000021d8340>{number = 1, name = main} で実行中 --

ここで、ループ処理をワーカースレッドで行うとSequenceAsyncSequenceの違いはなくなる。

func iterateAsynchronously() {
  Task.detached { ... }  // Task { @MainActor in ... } から変更
}

func iterateSynchronously() {
  Task.detached { ... }  // Task { @MainActor in ... } から変更
}
ワーカースレッドでイテレートさせたAsyncSequenceとSequenceの比較
// AsyncSequence
-- AlphabetAsyncIterator#next(): ワーカースレッド <NSThread: 0x600003a28340>{number = 7, name = (null)} で実行中 --

// Sequence
-- AlphabetIterator#next(): ワーカースレッド <NSThread: 0x6000032c4680>{number = 4, name = (null)} で実行中 --

AsyncSequenceを生成するAsyncStream

参考: SwiftConcurrencyについて調べたので備忘録

エラーが発生しないAsyncSequenceを生成するAsyncStream

/// `AsyncStream`による非同期イテレート処理を実行する
func iterateWithAsyncStream() {
  /// `AsyncStream`を生成する
  /// - Returns: `AsyncStream`
  @Sendable func createAsyncStream() -> AsyncStream<Int> {
    return AsyncStream { (continuation: AsyncStream<Int>.Continuation) in
      Task.detached {
        for i in 0 ..< 10 {
          try? await Task.sleep(until: .now + .nanoseconds(100), clock: .continuous)
          // 要素を追加する
          continuation.yield(i)
        }
        // 要素の追加を終了する
        continuation.finish()
      }
      
      // 要素の追加終了時に呼び出されるコールバック処理
      continuation.onTermination = { (termination: AsyncStream<Int>.Continuation.Termination) in
        switch termination {
          case .cancelled:
            print("AsyncStream.Continuation is cancelled.")
          case .finished:
            print("AsyncStream.Continuation is finished.")
          @unknown default:
            fatalError()
        }
      }
    }
  }
  
  Task.detached {
    var result: [Int] = []
    for await i in createAsyncStream() {
      result.append(i)
    }
    print("AsyncStream is done. - Result: \(result)")
  }
}

iterateWithAsyncStream()
実行結果
AsyncStream.Continuation is finished.
AsyncStream is done. - Result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

エラーが発生するAsyncSequenceを生成するAsyncThrowingStream

/// `AsyncThrowingStream`による非同期イテレート処理を実行する
func iterateWithAsyncThrowingStream() {
  /// `AsyncThrowingStream`を生成する
  /// - Returns: `AsyncThrowingStream`
  @Sendable func createAsyncThrowingStream() -> AsyncThrowingStream<Int, any Error> {
    return AsyncThrowingStream { (continuation: AsyncThrowingStream<Int, any Error>.Continuation) in
      Task.detached {
        for i in 0 ..< 10 {
          // 要素を追加する
          continuation.yield(i)
          
          if i >= 5 {
            // 要素の追加を終了し、Errorをスローする
            continuation.finish(throwing: NSError(domain: "TestError", code: -1))
            break
          }
        }
      }
      
      // 要素の追加終了時に呼び出されるコールバック処理
      continuation.onTermination = { (termination: AsyncThrowingStream<Int, any Error>.Continuation.Termination) in
        switch termination {
          case .cancelled:
            print("AsyncThrowingStream.Continuation is cancelled.")
          case .finished:
            print("AsyncThrowingStream.Continuation is finished.")
          @unknown default:
            fatalError()
        }
      }
    }
  }
  
  Task.detached {
    var result: [Int] = []
    do {
      for try await i in createAsyncThrowingStream() {
        result.append(i)
      }
    }
    catch {
      print(error.localizedDescription)
    }
    print("AsyncThrowingStream is done. - Result: \(result)")
  }
}

iterateWithAsyncThrowingStream()
実行結果
AsyncThrowingStream.Continuation is finished.
The operation couldn’t be completed. (TestError error -1.)
AsyncThrowingStream is done. - Result: [0, 1, 2, 3, 4, 5]

Taskを構造化するTaskGroup

TaskTaskGroupによって構造化することで、動的個数のタスクの並列実行やタスク間の優先度・キャンセル制御を簡潔に行うことができる。

エラーが発生しないタスクを構造化するTaskGroup

/// `TaskGroup`によってタスクを並列実行する
func executeParallelTasksInTaskGroup() {
  Task.detached {
    await Util.PETTracker.track {
      await withTaskGroup(of: Void.self) { (group: inout TaskGroup<Void>) in
        let tasks: [String] = ["Task A", "Task B", "Task C"]
        for task in tasks {
          group.addTask {
            return await Util.waitAndPrintAsync(taskName: task)
          }
        }
        
        await group.waitForAll()
      }
    }
  }
}

executeParallelTasksInTaskGroup()
実行結果
Task A: start waiting for 1[s]
Task B: start waiting for 1[s]
Task C: start waiting for 1[s]
Task B: end
Task A: end
Task C: end
実行時間: 1.0833109617233276[s]

エラーによってタスクをキャンセルするThrowingTaskGroup

ThrowingTaskGroupで形成した子タスクでErrorがスローされた場合、エラーをスローした子タスクを参照したタイミングで同じ階層の他タスクはキャンセルされ、親タスクにエラーが伝播する。

func executeParallelTasksThrowingErrorInThrowingTaskGroup() {
  Task.detached {
    await Util.PETTracker.track {
      do {
        try await withThrowingTaskGroup(of: Void.self) { (group: inout ThrowingTaskGroup<Void, any Error>) in
          let tasks: [String] = ["Task A", "Task B", "Task C"]
          for task in tasks {
            group.addTask {
              if task == "Task C" {
                throw NSError(domain: "TestError", code: -1)
              }
              else {
                return await Util.waitAndPrintAsync(taskName: task)
              }
            }
          }
          try await group.waitForAll()
        }
      }
      catch {
        print(error.localizedDescription)
      }
    }
  }
}

executeParallelTasksThrowingErrorInThrowingTaskGroup()
実行結果
Task A: start waiting for 1[s]
Task B: start waiting for 1[s]
Task A: end
Task B: end
The operation couldn’t be completed. (TestError error -1.)
実行時間: 0.02766096591949463[s]

タスクがキャンセル状態かどうかを判別する

参考: 同時並行処理(Concurrency) - タスクのキャンセル(Task Cancellation)

タスクをキャンセル状態にするには、以下の3つのメソッドのいずれかを用いる。

  • Task#cancel()
  • TaskGroup#cancelAll()
  • ThrowingTaskGroup#cancelAll()

タスクが途中でキャンセルされた場合、キャンセルされたTaskの返り値は以下の3つのいずれかになる。

  • Error
  • nilまたは空のコレクション
  • キャンセル時点の値

タスクがキャンセル状態であるかどうかを判別したい場合、以下の2つのメソッドのいずれかを用いる。

  • Task.isCancelled()
  • Task.checkCancellation()
/// タスクのキャンセル状態を確認する
func checkIfTaskIsCancelled() {
  // 外部タスク: 内部タスクのキャンセル状態に応じて文字列を出力し、内部タスクがキャンセルされていなければその値を返却する
  let outer: Task<[Int], Never> = Task.detached {
    // 内部タスク: 1〜10000の整数値を1つずつ追加した配列を返却する
    let inner: Task<[Int], any Error> = Task.detached {
      var result: [Int] = []
      for i in 1...10000 {
        try Task.checkCancellation()
        result.append(i)
      }
      return result
    }
    
    // 内部タスクをキャンセル状態にする
    inner.cancel()
    
    if inner.isCancelled {
      print("Inner Task is cancelled.")
    }
    if Task.isCancelled {
      // 構造化されていないTaskはキャンセル状態が伝播しない
      print("Outer Task is cancelled.")
    }
    
    do {
      return try await inner.value
    }
    catch {
      print(error.localizedDescription)
      return [-1]
    }
  }
  
  Task {
    print("Outer Task returns \(await outer.value).")
  }
}

checkIfTaskIsCancelled()
実行結果
Inner Task is cancelled.
The operation couldn’t be completed. (Swift.CancellationError error 1.)
Outer Task returns [-1].

Structured ConcurrencyとUnstructured Concurrency

参考: 同時並行処理(Concurrency) - 独立した並行処理(Unstructured Concurrency)
参考: Introduction to Unstructured Concurrency in Swift

Structured Concurrency(構造化された並行性)Unstructured Concurrency(独立した並行性)の違いをまとめると、以下のようになる。

Concurrency 特徴 生成手法
Structured タスクツリーが形成される async letバインディング
TaskGroup
Unstructured タスクツリーが形成されない Task.init(priority:operation:)
Task.detached(priority:operation:)

Structured ConcurrencyはTask間で親子関係を表すタスクツリーが形成され、キャンセルの伝播・コンパイルエラーの検知がある程度自動で行われるが、非同期関数内でしか扱うことができない。
一方でUnstructured Concurrencyは同期関数内でも扱える一方で、タスクツリーによる自動キャンセル処理機構がないため、キャンセル操作を明示的に実行する必要がある。

10
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
7