やりたいこと
C#8.0で追加される予定の 非同期ストリーム でどんなことができるのか、どんな書き方をするのかを知っておこう、というのがこの記事の趣旨です。
気になる点があれば、遠慮なくコメントしてください。
ずばり、できるようになったことは?
非同期ストリーム は「複数の値を順に扱いたい」かつ「非同期な処理をシンプルに書きたい」というのを実現させるための機能だといえそうです。
「複数の値を順に扱う」というのは、イテレータの機能で、従来よりyield
やforeach
などで知られている機能のことです。
「非同期な処理をシンプルに書きたい」というのは、C#5.0の頃に追加された非同期機能で、async
,await
およびTask
などとして知られている機能のことです。
つまり、既存の2つの機能をうまく融合させた機能が「非同期ストリーム」なのです。
できなかったこと
これまでは1つのメソッドで、async
/await
キーワードと、yield
キーワードの共存ができませんでした。
// [C#7.3まで]async/await と yield は共存できない
// async IEnumerable<int> DoAsync(){
C#8.0からはこれが書ける
async
メソッド、かつ、IAsyncEnumerable<T>
またはIAsyncEnumerator<T>
型をreturn
するメソッドを書いた場合、メソッドの内部でyield
キーワードを利用できるようになりました。 つまり async
/await
キーワードと、yield
キーワードが共存できます。
async IAsyncEnumerable<int> DoAsync(){
await Task.Delay(1000);
yield return 1;
}
同期イテレータとの対応
同期 | 非同期 |
---|---|
IEnumerable<T> |
IAsyncEnumerable<T> |
┗ .GetEnumerator()
|
┗ .GetAsyncEnumerator()
|
IEnumerator<T> |
IAsyncEnumerator<T> |
┗ .Current
|
┗ .Current
|
┗ .MoveNext()
|
┗ .MoveNextAsync()
|
IDisposable |
IAsyncDisposable<T> |
┗ .Dispose()
|
┗ .DisposeAsync()
|
気づき:MoveNextAsync()
では ValueTask
が使用されている。
非同期ストリームを消費する
IAsyncEnumerable<T>
がreturn
されても、これを使う側がないと片手落ちです。IEnumerable<T>
を使うためにforeach
を使っていたのと同様、IAsyncEnumerable<T>
でも、await foreach
が使えるようになりました。
これも違和感なく使える構文になっていると思います。
await foreach(var item in asyncStream) {
Console.WriteLine(item);
}
どんな動きになるのかな?
パターン1
async IAsyncEnumerable<int> DoAsync() {
foreach(var item in Enumerable.Range(1, 100).Select(x => x * 5)) {
await Task.Delay(item);
yield return item;
}
}
async ValueTask DoSomething() {
await foreach(var item in DoAsync()) {
Console.WriteLine(item);
}
}
これは動きが予想しやすいと思います。
5,10,15,...の数字が初めは勢いよく、徐々にスローダウンしながら表示されます。
パターン2
public static class Ex{
// 受けとったTaskの配列をIAsyncEnumerableとしてreturnする拡張メソッド
public static IAsyncEnumerable<T> AsAsyncEnumerable<T>(
this IEnumerable<Task<T>> tasks
) => tasks switch {
null => throw new ArgumentNullException(nameof(tasks)),
IEnumerable<Task<T>> ts => ts.AsAsyncEnumerableImpl(),
};
static async IAsyncEnumerable<T> AsAsyncEnumerableImpl<T>(
this IEnumerable<Task<T>> tasks
) {
foreach(var task in tasks) {
yield return await task;
}
}
}
async ValueTask DoSomething() {
var tasks = new List<Task<int>>();
tasks.Add(Task.Delay(1000).ContinueWith(_ => 1));
tasks.Add(Task.Delay(3000).ContinueWith(_ => 2));
tasks.Add(Task.Delay(5000).ContinueWith(_ => 3));
tasks.Add(Task.Delay(2000).ContinueWith(_ => 4));
tasks.Add(Task.Delay(4000).ContinueWith(_ => 5));
await foreach(int item in
AsyncEnumerableEx.AsAsyncEnumerable(tasks)) {
Console.WriteLine(item);
}
}
Task
を使い慣れている人ならば、間違えないと思いますが、開始からおよそ1秒後に1が、そのおよそ2秒後に2が、そのおよそ2秒後に3,4,5が表示されます。
List
にAdd
した時点でTask
が動き始めていることが重要ですね。
今回はTask.Delay()
のような無駄な処理を呼び出していますが、これがダウンロード処理と考えれば使い道は多そうな気がします。非同期処理とは言っていますが、結果は最初に指定した順に取り出されるので、処理も追いかけやすいはずです。
まとめ
- 非同期ストリームは、既存の2機能「イテレータ」と「非同期処理」の力強い融合
-
IAsyncEnumerable<T>
またはIAsyncEnumerator<T>
- 同期版のインタフェースやメソッドに
Async
が付いただけなので、違和感なく扱えそう。
- 同期版のインタフェースやメソッドに
- 列挙するときは、
await foreach
を使う。 - 非同期ストリーム機能の追加自体に難しいことはなさそう。難しいとすれば、それは非同期処理そのもの。
※C#8.0は現在Preview版です。Preview版で動作確認の上、当記事を書いていますが、記事の内容と異なる構文になる可能性もありますのでご了承ください。
今回書けなかったこと
- 非同期ストリームの例外処理や
CancellationToken
-
IAsyncEnumerable<T>
とIAsyncEnumerator<T>
の自力実装 -
await foreach
のダックタイピング的な挙動 - 非同期LINQ的なこと
過去記事
- C#8.0ネタは、いずれどこかにリンクをまとめておきたい。