Azure SDKのコードでIAsyncEnumerable
が出てきて理解できなかったので、調べてみた。
公式のIterating with Async Enumerables in C# 8を読めば本来十分である。このポストは自分の理解のために記述する。
きっかけ
下記のようなコードが出てきて、ん? Async メソッドなのに、await とか、Task/ValueTask
とか無いってどういうこと?と思ったのがきっかけ。このメソッドの戻り値は、AsyncPageable
というクラスで、IAsyncEnumerable
を実装したクラスになっている。さて、どう使うのだろう?
var containers = blobServiceClient.GetBlobContainersAsync();
使い方
通常のTask/ValueTask
とは違っていて、イテレーターのbool MoveNext()
が、ValueTask<bool> MoveNextAsync()
となっているのを理解するとつかいかたが理解しやすい。
await foreach(var container in container)
{
Console.WriteLine($"container: {container.Name} is already exists.");
}
インターフェイスの定義
さきほどご紹介した公式そのままだが、これがIAsyncEnumerable
の定義になっている。イテレータを操作する側に、ValueTask
が戻るようになっている。また、IAsyncDisposable
を実装しており、こちらの戻り値も、ValueTask
になっている。このイメージがあるとかなり理解しやすい。
namespace System.Collections.Generic
{
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator(
CancellationToken cancellationToken = default);
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
ValueTask<bool> MoveNextAsync();
T Current { get; }
}
}
namespace System
{
public interface IAsyncDisposable
{
ValueTask DisposeAsync();
}
}
ジェネレーターとしてのIAsyncEnumerable
IAsyncEnumerable を返却するメソッドを実装するとわかりやすい。結局のところ非同期のジェネレータだ。
yeild
があるので、ここに来るたびに結果を返す。このメソッド自体が、asyncとして定義されている。だから、一レコード毎の処理に時間がかかるようなもので使い勝手がよさそうだ。
private async IAsyncEnumerable<int> RangeAsync(int start, int count)
{
for (int i = 0; i < count; i++)
{
await Task.Delay(100);
yield return start + i;
}
}
CancellationToken
を使いたいケースもあるだろう。次のように書くと良い。ちなみに、Task.Delay()
は、キャンセルが発生したときに、TaskCanceledException
が発生するので、今回はそれをキャッチしてメッセージを出すようにしている。このように[EnumeratorCancellation]
アトリビュートをつけておくと、キャンセルが発生したら、これ以上イテレータが値を返すのが実施されなくなる。ただ、break
をつけておかないと、このfor
ループ自体はまわることになるのでそちらの方が良いだろう。
private async IAsyncEnumerable<int> RangeWithCancellationAsync(int start, int count, [EnumeratorCancellation] CancellationToken token = default)
{
for (int i = 0; i < count; i++)
{
try
{
await Task.Delay(100, token);
}
catch (TaskCanceledException)
{
Console.WriteLine("Task was cancelled.");
break; // Without this break, iterator keep on going.
}
yield return start + i;
}
}
CancellationToken と使う方法
Caller 側としては次のように書くことができる。
await foreach(var number in RangeWithCancellationAsync(0, 10).WithCancellation(cts.Token))
{
// Expect that it will finish at 4.
Console.WriteLine(number);
}
もしくは別の書き方で下記のように書いても結果は同じだ。
await foreach(var number in RangeWithCancellationAsync(0, 10, cts.Token))
{
// Expect that it will finish at 4.
Console.WriteLine(number);
}
0
1
2
Task was cancelled.
Reactive Extension の使用
Azure SDK 以外に IAsyncEnumerableが使われているものとして、Reactive Extensions がある。
下記のようなサンプルが理解しやすいだろう。1秒ごとに5回のイベントが発生するので、それをIAsyncEnumerableで待ち受けて実行するだけのコードである。ToAsyncEnumerable Extension method を使う事で、IEnumerable
をIAsyncEnumerable
に変換することができる。System.Linq.Async
の nuget package にこれが含まれている。このパッケージには SelectAwait
や、WhereAwait
が存在して、Linqと一緒につかえるのでとても便利だ。
ちなみに、Reactive Extension
を使うためにはSystem.Reactive.Linq
の nuget package を使うと良いだろう。
// Reactive support IAsyncEnumerator
var observable = Observable.Interval(TimeSpan.FromSeconds(1))
.Take(5)
.Select(l => (int)l);
await foreach(var number in observable.ToAsyncEnumerable())
{
Console.WriteLine($"from Rx: {number}");
}
実行すると次のようになる。1秒ごとに from Rx: 1
... が表示される。
from Rx: 0
from Rx: 1
from Rx: 2
from Rx: 3
from Rx: 4
注意事項
公式に書かれていた注意事項としては、MoveNextAsync()
が実行中に、MoveNextAync()
や DisposeAsync()
を実行すると不正な状態になるそうです。
IAsyncEnumerable<int> src = ...;
IAsyncEnumerator<int> e = src.GetAsyncEnumerator();
try
{
while (await e.MoveNextAsync().TimeoutAfter(30)) // BUG!!
Use(e.Current);
}
finally { if (e != null) await e.DisposeAsync(); }
公式にのっていた上記のケースの場合、MoveNextAsync()
がタイムアウトした場合、finally の DisposeAsync()
が実行されますが、そのときは、CancellationTokenが発生しただけで、MoveNextAsync()自体の処理は実行中です。このような時に問題が起こります。Much about threading.
IAsyncEnumerable with Linq
Linqでつかえるインテグレーションは System.Linq.Async
のパッケージの拡張メソッドのリストは ここで見つけることができます。ドキュメントは無いですが、ソースでわかります。
SelectAwait
await が必要な処理を Linq で書くことができます。RangeAsync() は先ほどの例なので、戻り値は IAsyncEnumerable
です。これをチェーンして、await を必要とするロジックをチェーンすることが可能です。
var result = RangeAsync(0, 5).SelectAwait(async i =>
{
await Task.Delay(1);
return i * i;
});
await foreach(var number in result)
{
Console.WriteLine($"from SelectAwait: {number}");
}
また、次のような例では、Azure SDK のコンテナを全部とってきて、その中の Blob をリストするものですが、GetBlobAsync() は、IAsyncEnumerable
を返します。そういった、IAsyncEnumerable
のチェーンの例です。
var blobs = await containers
.SelectMany(c =>
{
var containerClient = blobServiceClient.GetBlobContainerClient(c.Name);
return containerClient.GetBlobsAsync();
}).ToListAsync();
ユースケース
GTP-4 先生によると次のようなユースーケースがあるそうです。
非同期データソースからのデータ取得:
外部データソース(データベース、ファイルシステム、ネットワークリソースなど)から非同期的にデータをフェッチする場合に有効です。例えば、データベースから大量のレコードを非同期的に読み込むときや、非同期 I/O 操作を使用して大きなファイルを順次読み取るときなどです。
リアルタイムのイベント処理:
リアルタイムのイベントやメッセージをストリームとして受け取り、それを非同期的に処理する場合にも適しています。例えば、WebSocket からのメッセージのストリーミングや、IoT デバイスからの連続的なデータ受信などのシナリオで使用できます。
リアクティブプログラミング:
Reactive Extensions (Rx) などのリアクティブプログラミングツールキットを使用する場合、非同期ストリームとしてデータを扱うことが多いです。このような場面で IAsyncEnumerator を使用して、非同期的なデータの列挙を行うことができます。
素晴らしい blog で説明されていた例は、Azure SDK がまさにそうなのですが、イテレーションの間に、await 処理が発生するケースです。Azure SDK だと、ページャのオブジェクトがあって、ContinuousTokenを使って、次のページを取りに行く必要があります。そういういケースでIAsyncEnumerable ははまります。
まとめ
自分のIAsyncEnumerable
と System.Linq.Async
の理解はまだまだな気がしますが、継続して調べていきたいと思います。
リソース
こちらが絶対的なリソースですが、ドキュメントが少ないので、ブログも役に立ちます。