AsyncStreamのイニシャライザはbufferingPolicy
という引数を取ります。
init(
_ elementType: Element.Type = Element.self,
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
_ build: (AsyncStream<Element>.Continuation) -> Void
)
サブスクライバーがストリームをサブスクライブするよりも前に、ストリームに要素が流れる可能性があります。
このため、AsyncStreamには要素をバッファする機能があります。
以下の例では、ストリームには1秒ごとに0からインクリメントされた数字が流れてきます。
ここで、bufferingPolicy
に.bufferingNewest(1)
を指定します。
.bufferingNewest(1)
はストリームに流された最後の要素1つだけをバッファするという指定になります。
for-await-in文は10秒後に実行されるため、ストリームにはすでに0〜9の数字が流れた状態です。
このため、バッファされた9が最初に出力され、その後1秒ごとにインクリメントされた数字が出力されます。
バッファする数を変更すれば、出力内容も変わります。
例えば.bufferingNewest(5)
とした場合、バッファされた5〜9が最初に出力され、その後1秒ごとにインクリメントされた数字が出力されます。
let stream = AsyncStream<Int>(bufferingPolicy: .bufferingNewest(1)) { continuation in
Task.detached {
for i in 0..<100 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(i)
}
continuation.finish()
}
}
Task.detached {
try await Task.sleep(nanoseconds: 10 * 1_000_000_000)
for await i in stream {
print(i)
}
}
// 結果
// 9
// 10
// 11
// ...
BufferingPolicy型には他にbufferingOldest
とunbounded
があります。
上記の例で.bufferingOldest(5)
とした場合、バッファされた0〜4が最初に出力され、その後は10から順に出力されます。
// 結果
// 0
// 1
// 2
// 3
// 4
// 10
// 11
// ...
unbounded
とした場合は全ての要素がバッファされるため、0〜9が最初に出力され、その後は10から順に出力されます。
なお、AsyncStreamのイニシャライザのbufferingPolicy
のデフォルト値はunbounded
です。
※上記例において、タイミングによってバッファされる要素が変わるため、何回か実行を繰り返すと結果が変わることがあります