10
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

IAsyncEnumerable を理解する

Posted at

Azure SDKのコードでIAsyncEnumerableが出てきて理解できなかったので、調べてみた。
公式のIterating with Async Enumerables in C# 8を読めば本来十分である。このポストは自分の理解のために記述する。

きっかけ

下記のようなコードが出てきて、ん? Async メソッドなのに、await とか、Task/ValueTaskとか無いってどういうこと?と思ったのがきっかけ。このメソッドの戻り値は、AsyncPageable というクラスで、IAsyncEnumerableを実装したクラスになっている。さて、どう使うのだろう?

var containers = blobServiceClient.GetBlobContainersAsync();

使い方

通常のTask/ValueTaskとは違っていて、イテレーターのbool MoveNext()が、ValueTask<bool> MoveNextAsync() となっているのを理解するとつかいかたが理解しやすい。

await foreach(var container in container) 
{
    Console.WriteLine($"container: {container.Name} is already exists.");
}

インターフェイスの定義

さきほどご紹介した公式そのままだが、これがIAsyncEnumerableの定義になっている。イテレータを操作する側に、ValueTask が戻るようになっている。また、IAsyncDisposableを実装しており、こちらの戻り値も、ValueTaskになっている。このイメージがあるとかなり理解しやすい。

namespace System.Collections.Generic
{
  public interface IAsyncEnumerable<out T>
  {
    IAsyncEnumerator<T> GetAsyncEnumerator(
      CancellationToken cancellationToken = default);
  }
  public interface IAsyncEnumerator<out T> : IAsyncDisposable
  {
    ValueTask<bool> MoveNextAsync();
    T Current { get; }
  }
}
namespace System
{
  public interface IAsyncDisposable
  {
    ValueTask DisposeAsync();
  }
}

ジェネレーターとしてのIAsyncEnumerable

IAsyncEnumerable を返却するメソッドを実装するとわかりやすい。結局のところ非同期のジェネレータだ。
yeildがあるので、ここに来るたびに結果を返す。このメソッド自体が、asyncとして定義されている。だから、一レコード毎の処理に時間がかかるようなもので使い勝手がよさそうだ。

        private async IAsyncEnumerable<int> RangeAsync(int start, int count)
        {
            for (int i = 0; i < count; i++)
            {
                await Task.Delay(100);
                yield return start + i;
            }
        }

CancellationTokenを使いたいケースもあるだろう。次のように書くと良い。ちなみに、Task.Delay()は、キャンセルが発生したときに、TaskCanceledExceptionが発生するので、今回はそれをキャッチしてメッセージを出すようにしている。このように[EnumeratorCancellation]アトリビュートをつけておくと、キャンセルが発生したら、これ以上イテレータが値を返すのが実施されなくなる。ただ、breakをつけておかないと、このforループ自体はまわることになるのでそちらの方が良いだろう。

        private async IAsyncEnumerable<int> RangeWithCancellationAsync(int start, int count, [EnumeratorCancellation] CancellationToken token = default)
        {
            for (int i = 0; i < count; i++)
            {
                try
                {
                    await Task.Delay(100, token);
                }
                catch (TaskCanceledException)
                {
                    Console.WriteLine("Task was cancelled.");
                    break;  // Without this break, iterator keep on going.
                }
                yield return start + i;
            }
        }

CancellationToken と使う方法

Caller 側としては次のように書くことができる。

            await foreach(var number in RangeWithCancellationAsync(0, 10).WithCancellation(cts.Token))
            {
                // Expect that it will finish at 4.
                Console.WriteLine(number);
            }

もしくは別の書き方で下記のように書いても結果は同じだ。

            await foreach(var number in RangeWithCancellationAsync(0, 10, cts.Token))
            {
                // Expect that it will finish at 4.
                Console.WriteLine(number);
            }
実行結果
0
1
2
Task was cancelled.

Reactive Extension の使用

Azure SDK 以外に IAsyncEnumerableが使われているものとして、Reactive Extensions がある。
下記のようなサンプルが理解しやすいだろう。1秒ごとに5回のイベントが発生するので、それをIAsyncEnumerableで待ち受けて実行するだけのコードである。ToAsyncEnumerable Extension method を使う事で、IEnumerableIAsyncEnumerableに変換することができる。System.Linq.Asyncの nuget package にこれが含まれている。このパッケージには SelectAwait や、WhereAwaitが存在して、Linqと一緒につかえるのでとても便利だ。
ちなみに、Reactive Extension を使うためにはSystem.Reactive.Linqの nuget package を使うと良いだろう。

            // Reactive support IAsyncEnumerator
            var observable = Observable.Interval(TimeSpan.FromSeconds(1))
                .Take(5)
                .Select(l => (int)l);
            await foreach(var number in observable.ToAsyncEnumerable())
            {
                Console.WriteLine($"from Rx: {number}");
            }

実行すると次のようになる。1秒ごとに from Rx: 1 ... が表示される。

実行結果
from Rx: 0
from Rx: 1
from Rx: 2
from Rx: 3
from Rx: 4

注意事項

公式に書かれていた注意事項としては、MoveNextAsync() が実行中に、MoveNextAync()DisposeAsync()を実行すると不正な状態になるそうです。

IAsyncEnumerable<int> src = ...;
IAsyncEnumerator<int> e = src.GetAsyncEnumerator();
try
{
  while (await e.MoveNextAsync().TimeoutAfter(30)) // BUG!!
    Use(e.Current);
}
finally { if (e != null) await e.DisposeAsync(); }

公式にのっていた上記のケースの場合、MoveNextAsync()がタイムアウトした場合、finally の DisposeAsync()が実行されますが、そのときは、CancellationTokenが発生しただけで、MoveNextAsync()自体の処理は実行中です。このような時に問題が起こります。Much about threading.

IAsyncEnumerable with Linq

Linqでつかえるインテグレーションは System.Linq.Asyncのパッケージの拡張メソッドのリストは ここで見つけることができます。ドキュメントは無いですが、ソースでわかります。

SelectAwait

await が必要な処理を Linq で書くことができます。RangeAsync() は先ほどの例なので、戻り値は IAsyncEnumerableです。これをチェーンして、await を必要とするロジックをチェーンすることが可能です。

            var result = RangeAsync(0, 5).SelectAwait(async i =>
            {
                await Task.Delay(1);
                return i * i;
            });

            await foreach(var number in result)
            {
                Console.WriteLine($"from SelectAwait: {number}");
            }

また、次のような例では、Azure SDK のコンテナを全部とってきて、その中の Blob をリストするものですが、GetBlobAsync() は、IAsyncEnumerableを返します。そういった、IAsyncEnumerableのチェーンの例です。

var blobs = await containers
    .SelectMany(c =>
    {
        var containerClient = blobServiceClient.GetBlobContainerClient(c.Name);
        return containerClient.GetBlobsAsync();
    }).ToListAsync();

ユースケース

GTP-4 先生によると次のようなユースーケースがあるそうです。

非同期データソースからのデータ取得:

外部データソース(データベース、ファイルシステム、ネットワークリソースなど)から非同期的にデータをフェッチする場合に有効です。例えば、データベースから大量のレコードを非同期的に読み込むときや、非同期 I/O 操作を使用して大きなファイルを順次読み取るときなどです。

リアルタイムのイベント処理:

リアルタイムのイベントやメッセージをストリームとして受け取り、それを非同期的に処理する場合にも適しています。例えば、WebSocket からのメッセージのストリーミングや、IoT デバイスからの連続的なデータ受信などのシナリオで使用できます。

リアクティブプログラミング:

Reactive Extensions (Rx) などのリアクティブプログラミングツールキットを使用する場合、非同期ストリームとしてデータを扱うことが多いです。このような場面で IAsyncEnumerator を使用して、非同期的なデータの列挙を行うことができます。

素晴らしい blog で説明されていた例は、Azure SDK がまさにそうなのですが、イテレーションの間に、await 処理が発生するケースです。Azure SDK だと、ページャのオブジェクトがあって、ContinuousTokenを使って、次のページを取りに行く必要があります。そういういケースでIAsyncEnumerable ははまります。

まとめ

自分のIAsyncEnumerableSystem.Linq.Async の理解はまだまだな気がしますが、継続して調べていきたいと思います。

リソース

こちらが絶対的なリソースですが、ドキュメントが少ないので、ブログも役に立ちます。

10
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?