LoginSignup
4
5

More than 3 years have passed since last update.

非同期ストリーム IAsyncEnumerable<T> から一定個数ずつ列挙する

Last updated at Posted at 2020-02-20

このドキュメントの内容

C# 8.0 で追加された非同期ストリーム(IAsyncEnumerable<T>)から一定個数ずつオブジェクトを列挙する方法を紹介します。一定個数ずつバッファリングしながら処理を行いたい、バッファリングの手段はそれぞれの処理の実装に委ねたいというような場面で使うことを想定しています。

IAsyncEnumerable<T> インターフェースに対する拡張メソッドとして実装しました。

Chunkメソッド
/// <summary>
/// 指定された非同期ストリームから一定個数ずつオブジェクトを列挙します。
/// </summary>
/// <param name="enumerable">列挙対象の非同期ストリーム</param>
/// <param name="chunkSize">一度に取得する個数</param>
/// <param name="cancellationToken">キャンセルトークン</param>
/// <returns>一定個数ずつオブジェクトを列挙する非同期ストリーム</returns>
public static IAsyncEnumerable<IAsyncEnumerable<T>> Chunk<T>(
    this IAsyncEnumerable<T> enumerable
    , int chunkSize
    , CancellationToken cancellationToken = default
)

サンプルコード

あまり現実的な内容ではありませんが、ArrayPool<T> と Span<T> を使って一定個数ずつバッファリングする例です。

private static async Task ChunkEnumerateSample()
{
    SampleData[] buffer = ArrayPool<SampleData>.Shared.Rent(3);
    try
    {
        // 3つずつ列挙する
        await foreach (var chunk in GetSampleData().Chunk(3))
        {
            Console.WriteLine("----- start chunk -----");
            await DoSomethingAsync(chunk, buffer);
            Console.WriteLine("----- end chunk -----");
        }
    }
    finally
    {
        ArrayPool<SampleData>.Shared.Return(buffer);
    }
}

private static async Task DoSomethingAsync(IAsyncEnumerable<SampleData> enumerable, SampleData[] buffer)
{
    int count = 0;
    await foreach (var obj in enumerable)
    {
        buffer[count++] = obj;
    }
    DoSomething(buffer, count);
}

private static void DoSomething(SampleData[] buffer, int length)
{
    var span = new Span<SampleData>(buffer, 0, length);

    for (int i = 0; i < span.Length; ++i)
    {
        Console.WriteLine($"{span[i].Value}");
    }
}

// 10個のオブジェクトを返す
private static async IAsyncEnumerable<SampleData> GetSampleData()
{
    await Task.Yield();

    for (int i = 0; i < 10; ++i)
    {
        yield return new SampleData(i);
    }
}

public class SampleData
{
    public SampleData(int value)
    {
        Value = value;
    }
    public int Value { get; }
}

拡張メソッド Chunk の実装内容

public static class AsyncEnumerableExtensions
{
    /// <summary>
    /// 指定された非同期ストリームから一定個数ずつオブジェクトを列挙します。
    /// </summary>
    /// <param name="enumerable">列挙対象の非同期ストリーム</param>
    /// <param name="chunkSize">一度に取得する個数</param>
    /// <param name="cancellationToken">キャンセルトークン</param>
    /// <returns>一定個数ずつオブジェクトを列挙する非同期ストリーム</returns>
    public static IAsyncEnumerable<IAsyncEnumerable<T>> Chunk<T>(
        this IAsyncEnumerable<T> enumerable
        , int chunkSize
        , CancellationToken cancellationToken = default
    )
    {
        return Chunk(enumerable, chunkSize, new NullState(), cancellationToken);
    }

    /// <summary>
    /// 指定された非同期ストリームから一定個数ずつオブジェクトを列挙します。
    /// </summary>
    /// <param name="enumerable">列挙対象の非同期ストリーム</param>
    /// <param name="chunkSize">一度に取得する個数</param>
    /// <param name="state">列挙状態を管理するオブジェクト</param>
    /// <param name="cancellationToken">キャンセルトークン</param>
    /// <returns>一定個数ずつオブジェクトを列挙する非同期ストリーム</returns>
    public static async IAsyncEnumerable<IAsyncEnumerable<T>> Chunk<T>(
        this IAsyncEnumerable<T> enumerable
        , int chunkSize
        , IAsyncEnumerablorState state
        , [EnumeratorCancellation]CancellationToken cancellationToken = default
    )
    {
        await using IAsyncEnumerator<T> enumerator = enumerable.GetAsyncEnumerator(cancellationToken);

        while (!state.IsEof)
        {
            yield return new ChunkAsyncEnumerable<T>(enumerator, chunkSize, state);
        }
    }

    private class NullState : IAsyncEnumerablorState
    {
        bool IAsyncEnumerablorState.IsEof { get; set; }
        void IAsyncEnumerablorState.OnMoveNext() {}
    }

    private readonly struct ChunkAsyncEnumerable<T> : IAsyncEnumerable<T>
    {
        internal ChunkAsyncEnumerable(
            IAsyncEnumerator<T> enumerator
            , int chunkSize
            , IAsyncEnumerablorState state
            )
        {
            m_Enumerator = enumerator;
            m_ChunkSize = chunkSize;
            m_State = state;
        }

        private readonly IAsyncEnumerator<T> m_Enumerator;
        private readonly int m_ChunkSize;
        private readonly IAsyncEnumerablorState m_State;

        private async IAsyncEnumerable<T> Enumerate()
        {
            int count = 0;

            while (await m_Enumerator.MoveNextAsync().ConfigureAwait(false))
            {
                yield return m_Enumerator.Current;
                ++count;
                m_State.OnMoveNext();
                if (count >= m_ChunkSize) { yield break; }
            }

            m_State.IsEof = true;
        }

        public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
        {
            return Enumerate().GetAsyncEnumerator(cancellationToken);
        }
    }

}

public interface IAsyncEnumerablorState
{
    bool IsEof { get; set; }
    void OnMoveNext();
}

サンプルコードの実行結果

3個ずつ列挙され、最後の端数も正しく列挙されています。

----- start chunk -----
0
1
2
----- end chunk -----
----- start chunk -----
3
4
5
----- end chunk -----
----- start chunk -----
6
7
8
----- end chunk -----
----- start chunk -----
9
----- end chunk -----
4
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
5