LoginSignup
6
5

Swift Concurrency AsyncStreamを使ってみる

Last updated at Posted at 2023-12-23

はじめに

この記事はTimee Advent Calendar 2023の24日目の記事です。
書いてて気づいたんですが、24日はM-1の決勝の日でした!
がんばれ真空ジェシカ!

改めまして、こんにちは。最近インデントはスペース2個派になったhayakawaです。
2023年9月にタイミーのiOSエンジニアとしてジョインしました。

今回はSwiftの非同期プログラミングを行うための機能の一部である、AsyncStreamについて学んだことの備忘録となります。

AsyncStream

上記のドキュメントを読むとAsyncStreamはAsyncSequenceに準拠していると書いてあります。
ではAsyncSequenceとは何かというと、こちらもドキュメントによるとSequenceプロトコルに似ていると書いてあります。
Sequenceプロトコルに準拠すると、お馴染みのfor-inやforEach、filterなどの高階関数が使用できるようになります。
つまり、これのasync/await版という解釈で良いかなと思っています。
AsyncStreamでラップすることでAsyncSequenceに準拠したものを作ることができ、非同期処理の1つ1つの結果の値を取得ののち、高階関数を用いて値の変換などができるようになります。

AsyncStreamを実際に動かしてみる

let numbers = Array(0..<5)

func numberStream() -> AsyncStream<Int> {
  AsyncStream { continuation in
    Task {
      for number in numbers {
        try await Task.sleep(nanoseconds: 1_000_000_000)
        continuation.yield(number)
      }
      continuation.finish()
    }
  }
}

Task {
  for await number in numberStream() {
    print("value: \(number)")
  }
  print("Stream Finished")
}

上記のコードは1秒ごとに配列内の値を順にストリームへ流しています。
実行結果を見ると、値が順番に流れfor-inを抜けたタイミングでストリームが完了していることが分かります。
continuationの.yield(_:)を実行することでストリームに値が流れます。
.finish()を実行することでストリームが完了します。

実行結果
value: 0
value: 1
value: 2
value: 3
value: 4
Stream Finish

このようにonTermination:でも終了時の処理を行うことができます。

func numberStream() -> AsyncStream<Int> {
  AsyncStream { continuation in
    Task {
      for number in numbers {
        try await Task.sleep(nanoseconds: 1_000_000_000)
        continuation.yield(number)
      }
      continuation.finish()
    }
      
    continuation.onTermination = { _ in
       print("Stream Finished") // 終了時の処理を行う
    }
  }
}

実行結果は同じになります。

実行結果
value: 0
value: 1
value: 2
value: 3
value: 4
Stream Finish

onTermination:の引数を取るようにすれば終了した原因によって処理を分岐をすることができます。

func numberStream() -> AsyncStream<Int> {
  AsyncStream { continuation in
    Task {
      for number in numbers {
        try await Task.sleep(nanoseconds: 1_000_000_000)
        continuation.yield(number)
      }
      continuation.finish()
    }
      
    continuation.onTermination = { termination in
        // 終了時の処理を条件によって分岐する
        switch termination {
        case .finished: print("Stream Finished")
        case .cancelled: print("Stream Cancelled")
        }
      }
  }
}

AsyncThrowingStream

AsyncStreamに似たものに、AsyncThrowingStreamというものがあります。
こちらはAsyncStreamと同じようにAsyncSequenceに準拠している点では一緒ですが、エラーを流すことができるという違いがあります。
そのためAPIからレスポンスの取得などの、非同期処理で失敗するケースがある場合に利用できそうです。

AsyncThrowingStreamを実際に動かしてみる

let numbers = Array(0..<5)

func numberStream() -> AsyncThrowingStream<Int, Error> {
  AsyncThrowingStream { continuation in
    Task {
      for number in numbers {
        try await Task.sleep(nanoseconds: 1_000_000_000)
        continuation.yield(number)
      }
      continuation.finish()
    }
      
    continuation.onTermination = { termination in
        switch termination {
        case .finished(let error): // エラーを受け取ることができる
          if error != nil {
            print("Stream Finished")
            print("Errorによって終了: \(error)")
          } else {
            print("Stream Finished")
            print("正常終了")
          }
        case .cancelled: print("Stream Cancelled")
        }
      }
  }
}

Task {
  for try await number in numberStream() {
    print("value: \(number)")
  }
}

基本的にはAsyncStreamの時と変わりませんが、.finished()でエラーを受け取ることができるようになっています。
今回はエラーを起こしていないので、実行結果から正常にストリームが完了したことが分かります。

実行結果
value: 0
value: 1
value: 2
value: 3
value: 4
Stream Finished
正常終了

最後に

今更ながらAsyncStreamについてまとめてみました。
今回のサンプルでは通信を行なっていないので、簡単なAPI通信を行うサンプルも作ってみたいと思います。

ストリームの概念がSwiftの標準に入ってくることで、Swift本体でできることがどんどん増えていきますね。
タイミーでは順次RxSwiftからSwift Concurrencyへと移行しているので、近いうちにAsyncStreamの出番がやってくるのではないかと思っています。

6
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
5