C#
LINQ

LINQでBufferをやってみる話

IEnumerable<T>を指定件数で分割するという記事が何件か投稿されてたので自分でもやってみよう!という話
RxのBuffer使えばいいじゃん?と言うのはナシ。中身は参照してるけども。
記事の内容に誤りがあればバシバシ叩いてください。

2017.10.17追記
幾つかの実装にガード節の誤りがあったので修正。
普通のListバッファを使った実装を2通り追加。

なお本記事ではBufferという、また別のメソッド名を利用してるが、これは前述の通りReactive Extensionsの同名メソッドに倣うということで採用した。

実装を始める前に

そもそもBufferにはどんな仕様が要求されるのか、関連インターフェースにはどんな制約があるのかを考えてみる。

IEnumerable<T>

そのインスタンスに対するイテレータを公開する。
List<T>Dictionary<TKey,TValue>のような、.NetのコレクションはすべてIEnumerable<T>を実装している。
LINQのオペレータはIEnumerable<T>の拡張メソッドとして定義されている。

IEnumerator<T>

IEnumerable<T>.GetEnumerator()メソッドにより取得できるイテレータの実体。
主に現在の要素を返すCurrentプロパティと、次の要素を検索するMoveNext()メソッドからなる。

IEnumerator<T>の重要な性質として、もとのコレクションの変更を認めないという点があげられる。

IEnumerator Interface (System.Collections.Generic) | Microsoft Docs

An enumerator remains valid as long as the collection remains unchanged. If changes are made to the collection, such as adding, modifying, or deleting elements, the enumerator is irrecoverably invalidated and the next call to MoveNext or Reset throws an InvalidOperationException.

つまり、GetEnumerator()IEnumerator<T>インスタンスを生成したあと、もとのデータソースを変更してしまうとIEnumerator<T>インスタンスは壊れてしまうということ。
後述するが、これはLINQの性質と密接に関係がある。

コルーチン

C#ではIEnumerable<T>の実装、あるいはIEnumerable<T>を返すメソッドの実装にyieldキーワードによりコルーチンを利用できる。
例えば、次のように記述することができる。

YieldSample.cs
class FooClass
{
    // {0*a, 1*a,2*a}を返す。
    public IEnumerable<int> GetFooEnumerable(int a)
    {
        yield return 0 * a;
        yield return 1 * a;
        yield return 2 * a;
    }
}

yieldはかなり複雑な糖衣構文で、コンパイラがステートマシンを自動生成する。
前述のコードから自動生成されたクラスを読みやすく書き直すと以下のようになる。
本来の生成コードはC#では認められない文法(ILレベルでは正当)を含んでいるが、ここではちゃんとC# 7.0でコンパイルが通るところまでいじっている。

YieldSample_GeneratedCode.cs
class FooClass
{
    private sealed class FooEnumerable : IEnumerable<long>, IEnumerator<long>
    {

        private int _state;

        private long _current;

        private int _initialThreadId;

        private int _a;

        public int A
        {
            get => _a;
            set => _a = value;
        }

        public FooClass _this;

        long IEnumerator<long>.Current => _current;

        object IEnumerator.Current => _current;



        public FooEnumerable(int state)
        {
            _state = state;
            _initialThreadId = Environment.CurrentManagedThreadId;
        }



        void IDisposable.Dispose() { }



        bool IEnumerator.MoveNext()
        {
            switch (_state)
            {
                    case 0:
                        _state = -1;
                        _current = 0;
                        _state = 1;
                        return true;
                    case 1:
                        _state = -1;
                        _current = _a;
                        _state = 2;
                        return true;
                    case 2:
                        _state = -1;
                        _current = 2 * _a;
                        _state = 3;
                        return true;
                    case 3:
                        _state = -1;
                        return false;
                    default:
                        return false;
            }
        }



        void IEnumerator.Reset()
            => throw new NotSupportedException();



        IEnumerator<long> IEnumerable<long>.GetEnumerator()
        {
            FooEnumerable retval;
            if ( _state == -2 &&  _initialThreadId == Environment.CurrentManagedThreadId)
            {
                _state = 0;
                retval = this;
            }
            else
            {
                retval = new FooEnumerable(0) { _this = _this };
            }

            retval._a =  A;
            return retval;
        }



        IEnumerator IEnumerable.GetEnumerator()
            => (this as IEnumerable<long>).GetEnumerator();

    }



    public IEnumerable<long> GetFooEnumerable(int a)
    {
        var retval = new FooEnumerable(-2)
        {
            _this = this,
            A = a
        };
        return retval;
    }

}

状態変数_stateフィールドで次にどのyield returnを返すのかを記憶し、その値に応じてMoveNext()メソッド内で_currentフィールドの値を書き換えている。
もとのコードでループを使っていればMoveNext()の中身はもっと複雑になる。

注目してもらいたいのは_thisフィールドと_aフィールドだ。
これはもとのYieldSample.csのGetFooEnumerable(int a)メソッドの引数である。1
このように、コルーチンメソッドの引数やローカル変数は、自動生成されたEnumeratorのフィールドに転送される。

LINQ

統合言語クエリ。
データ集合に対する一律な問い合わせ手法をまとめた機能群のこと。
関数型プログラミングのエッセンスを取り入れており、データ加工を順序立てて表現できるというメリットがある。
.NetにおいてはIEnumerable<T>の拡張メソッドとして提供されている。

LINQの重要な性質は、それが遅延評価されるということだ。
LINQオペレータを積み上げただけでは結果は確定せず、実際にイテレーションを開始することで最終的なデータが出来上がる。
これにより、LINQでは必要なデータだけが評価されるようになっている。
膨大なデータソース(場合によっては無限)に対して必要なデータが少ない場合にはパフォーマンスの観点からもパワフルなシステムなのだ。

だが先述したIEnumerator<T>の性質のため、実際にすべての出力を評価し終えるよりも前に論理的な評価結果は確定している。
評価を開始した後はもとのデータソースを変更することができなくなるからである。
つまり、イテレーションを開始した時点でその出力は一意に決まるのだ。2
キャッシュを利用するならこのことを頭の片隅にでも置いておいたほうがいいだろう。

Bufferオペレータの仕様

これらLINQの仕様を踏まえた上でBufferの仕様を決めていきたい。


シグニチャと制約
IEnumerable<IEnumerable<T>> Buffer<T>(this IEnumerable<T> source, int count)

  • source : 入力シーケンス。非null。
  • count : 正の整数。
  • return value : sourceの要素をcount個ずつまとめたイテラブルを列挙するイテラブル。

Bufferを実装するにあたり求められる最低限の仕様。ここは多分誰も異存ないだろうと思う。


遅延評価

  • sourceは遅延評価する。

さらっと言ったが実はここが結構複雑。というのも、Bufferが取り扱うべきIEnumerableは2通りあるからだ。
すなわち、次のようなコードがあったとき、

BufferSample.cs
var buffered = source.Buffer(5);         // (0)
var chunk    = buffered.ElementAt(2);    // (1)
var value    = buffer.ElementAt(3);      // (2)

(1)と(2)のどちらで出力が確定しているべきか、という点だ。
本記事で列挙した実装例でもこの部分の取り扱いはまちまちだ。
いずれにせよ、(0)のタイミングで即時評価はしない、という点では共通している。

個人的には(1)のタイミングで評価完了している方が自然だと思う。
source.GetEnumerator()が実行されているし、RxのBufferも事実そのような実装になっている。


戻り値の性質

  • 剰余が出た場合は末尾に余りの分のイテラブル要素を付ける。
  • 剰余がない場合には末尾に空のイテラブル要素を付けない。
  • 戻り値自体に、また戻り値の各要素に対してイミュータブル。

最後の項目は、元のソースが不変である限り何回評価しても同じであるということだ。
一度評価すると壊れてしまうと言った制約をつければより高効率化できるかもしれないが、そういった制約は設けないということである。


ベンチマーク

今回利用したベンチマーク用のメソッド。
ソースの要素数に対してどれほど計算時間を要するかを確認する。
ちゃんとGCも立ち上げてテスト間の依存性を減らすよう心がける。

BufferBenchmark.cs
/// <summary> Bufferの戻り値の評価 </summary>
public static double Benchmark1(int count)
{
    GC.Collect();
    GC.WaitForPendingFinalizers();

    var sw = new Stopwatch();
    var enumerable = Enumerable.Range(0, count).Buffer(10).Last();
    sw.Start();
    for (int i = 0; i < 1000000; ++i)
        enumerable.Last();

    sw.Stop();
    return 1000.0 * sw.ElapsedTicks / Stopwatch.Frequency;
}

/// <summary> Buffer + Bufferの戻り値の評価 </summary>
public static double Benchmark2(int count)
{
    GC.Collect();
    GC.WaitForPendingFinalizers();

    var sw = new Stopwatch();
    sw.Start();
    for (int i = 0; i < 100; ++i)
        Enumerable.Range(0, count).Buffer(10).Last().Last();

    sw.Stop();
    return 1000.0 * sw.ElapsedTicks / Stopwatch.Frequency;
}

実装例

I. Skip & Take その1

Buffer1.cs
public static IEnumerable<IEnumerable<T>> Buffer<T>(this IEnumerable<T> source, int count)
{
    if(source == null)
        throw new ArgumentNullException(nameof(source));
    if(count <= 0 )
        throw new ArgumentOutOfRangeException(nameof(count));

    IEnumerable<IEnumerable<T>> BufferImpl()
    {
        for (; source.Any(); source = source.Skip(count))
            yield return source.Take(count);
    }
    return BufferImpl();
}

多分これが一番シンプルな実装。
中身も非常にLINQらしい仕上がりとなっている。
手元でさらっと実装して使うときにはこの実装を利用している人も多いと思う。
この実装では遅延評価の確定タイミングは(2)である。

II. Skip & Take その2

その1の実装は短くまとまっているが、パフォーマンス的によろしいとはいえない。
ベンチマークを取ってみると計算時間はソースの要素数に対して$O(n^3)$程度にもなっている。
原因はSkipオペレータが多段化してしまっていることにある。
後ろの方のイテラブルはSkipのネストが深くなりすぎてとても重くなる。
そこでスキップ数の方をカウントアップし、常にSkip1段のみになるよう書き直す。

Buffer2.cs
public static IEnumerable<IEnumerable<T>> Buffer<T>(this IEnumerable<T> source, int count)
{
    if(source == null)
        throw new ArgumentNullException(nameof(source));
    if(count <= 0 )
        throw new ArgumentOutOfRangeException(nameof(count));

    IEnumerable<IEnumerable<T>> BufferImpl()
    {
        for (int i = 0; source.Skip(i).Any() ; i += count)
            yield return source.Skip(i).Take(count);
    }
    return BufferImpl();
}

こちらは$O(n^2)$程度なので、前項のものと比べるとかなり速い。
ほんの僅かに記述量が増えたものの、パフォーマンス向上の恩恵に比べれば安いものじゃなろうか。

こちらの実装でも遅延評価の確定タイミングは(2)である。

III. Arrayバッファ

先行記事で紹介されていた実装。
アルゴリズムの勉強と思って、一応丸コピペではなく自分流に書き直してある。

Buffer3.cs
public static IEnumerable<IEnumerable<T>> Buffer<T>(this IEnumerable<T> source, int count)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source));
    if (count <= 0)
        throw new ArgumentOutOfRangeException(nameof(count));

    IEnumerable<IEnumerable<T>> BufferImpl()
    {
        using (var enumerator = source.GetEnumerator())
        {
            while (enumerator.MoveNext())
            {
                var array = new List<T>(count);
                for (int i = 0; i < count; ++i)
                {
                    array.Add(enumerator.Current);
                    if (!enumerator.MoveNext())
                        break;
                }

                if (array.Count != 0)
                    yield return array;
            }
        }
    }
    return BufferImpl();
}

分割後の各イテラブルが始まるタイミングで要素数countの配列を生成、シーケンスをインクリメントしながら配列を詰めていき、全て埋まったところでyield return。
評価タイミングは(1)。

IV. Listバッファ その1

III.の実装をArrayからListに置き換えたもの。Addが使えるので若干表現が変わる。

Buffer4.cs
public static IEnumerable<IEnumerable<T>> Buffer<T>(this IEnumerable<T> source, int count)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source));
    if (count <= 0)
        throw new ArgumentOutOfRangeException(nameof(count));

    IEnumerable<IEnumerable<T>> BufferImpl()
    {
        using (var enumerator = source.GetEnumerator())
        {
            while (enumerator.MoveNext())
            {
                var list = new List<T>(count);
                for (int i = 0; i < count; ++i)
                {
                    list.Add(enumerator.Current);
                    if (!enumerator.MoveNext())
                        break;
                }

                if (list.Count != 0)
                    yield return list;
            }
        }
    }
    return BufferImpl();
}

分割後の各イテラブルが始まるタイミングで要素数countの配列を生成、シーケンスをインクリメントしながら配列を詰めていき、全て埋まったところでyield return。
評価タイミングは(1)。

V. Queue & Listバッファ

Reactive Extensionsで用意されているアルゴリズム。
Rxを導入しているならこれ使っとけばええねんという感じだが、それはそれ。
RxのBufferは単純にシーケンスを分割するだけでなく、出力される各シーケンスがどのような間隔(skip引数)をとるかを指定するオーバーロード3を持っており、それを踏まえた実装になっている。

Buffer5.cs
public static IEnumerable<IEnumerable<T>> Buffer<T>(this IEnumerable<T> source, int count)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source));
    if (count <= 0)
        throw new ArgumentOutOfRangeException(nameof(count));

    return source.BufferImpl(count, count);
}

private static IEnumerable<IEnumerable<T>> BufferImpl<T>(this IEnumerable<T> source, int count, int skip)
{
    var buffers = new Queue<IList<T>>(Math.Max(1, count-skip) + 1);

    var i = 0;
    foreach (var item in source)
    {
        if (i % skip == 0)
            buffers.Enqueue(new List<T>(count));

        foreach (var buffer in buffers)
            buffer.Add(item);

        if (buffers.Count > 0 && buffers.Peek().Count == count)
            yield return buffers.Dequeue();

        i++;
    }

    while (buffers.Count > 0)
        yield return buffers.Dequeue();
}

方針としては、Arrayバッファの時と同様イテラブルの先頭要素に来たときに新しいバッファを生成し、要素が必要数溜まった段階で放流する。
異なっているのはListを使っていることと、Queueにより複数のイテラブルを並行的に生成できることだ。

このアルゴリズムの評価タイミングは(1)。

VI. Listバッファ その2

V.の実装はオーバーロードのためにQueueを用意しており、追加のコストを払っている。
今回話題にしている、count個ごとのシーケンスの分割だけを考えるならList一つで十分なはずだと指摘を受け、急遽実装。

Buffer6.cs
public static IEnumerable<IEnumerable<T>> Buffer<T>(this IEnumerable<T> source, int count)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source));
    if (count <= 0)
        throw new ArgumentOutOfRangeException(nameof(count));

    return source.BufferImpl(count, count);
}

private static IEnumerable<IEnumerable<T>> BufferImpl<T>(this IEnumerable<T> source, int count)
{
    var buffers = new List<T>(count);

    foreach (var item in source)
    {
        buffer.Add(item);

        if (buffer.Count == count)
        {
            yield return buffer;
            buffer = new List<T>(count);
        }
    }

    if (buffers.Count > 0)
        yield return buffer;
}

このアルゴリズムの評価タイミングは(1)。

VII. 単一Listバッファ

最後に自作のアルゴリズムを一つ。

Buffer7.cs
public static IEnumerable<IEnumerable<T>> Buffer<T>(this IEnumerable<T> source, int count)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source));
    if (count <= 0)
        throw new ArgumentOutOfRangeException(nameof(count));

    return source.BufferImpl(count, count);
}

private static IEnumerable<IEnumerable<T>> BufferImpl<T>(this IEnumerable<T> source, int count, int skip)
{
    var buffer = new List<T>();
    using (var enumerator = source.GetEnumerator())
    {
        int i = 0;
        for (; i < count && enumerator.MoveNext(); ++i)
            buffer.Add(enumerator.Current);

        for (; enumerator.MoveNext(); ++i)
        {
            if ((i - count) % skip == 0)
                yield return new SubList<T>(buffer, i - count, count);
            if (!enumerator.MoveNext())
                break;
            buffer.Add(enumerator.Current);
        }

        if ((i - count) % skip != 0)
            yield return new SubList<T>(buffer, i - (i % skip));
    }
}

最初のfor文は、count個バッファに溜まっていない状態では放流を開始しないという条件分岐の計算コストを減らすためのもの。
2つ目のfor文とマージしてもよいが、どうせなら余計な計算コストは避けたいものだ。

このアルゴリズムでは、バッファには単一のリストを利用している。
SubList<T>IList<T>のデコレータで、startcountという2つのパラメータを用いてインデックスにオフセットをかけた部分リストを表現する。あくまでデコレータなので自身にアロケーションコストはない。
区間を抽出できさえすればわざわざ新しいバッファを用意する必要がなく、またIListであれば区間抽出の計算コストが$O(1)$にできるという訳だ。
調子に乗ってクソでかいソースに対して使うとListの内部配列がLOH送りになってパフォーマンスに影響が出るかもしれないが・・・。

このアルゴリズムも評価タイミングは(1)である。

ベンチマーク

というわけでベンチマークを取ってみる。
とりあえず自分のマシンで回した結果は以下の通り。

スペック
- CPU : Intel Core i7-6700K 4.00GHz (64bit)
- RAM : 32.0GB
- OS : Windows 10 Home
- 処理系 : .NET Framework 4.6.1 64bit

  • Benchmark1
count = 100 500 1000 5000 10000 10^5 10^6 10^7
Skip & Take 1 4555ms 100036ms 379809ms X X X X X
Skip & Take 2 598ms 1582ms 2945ms 13857ms 27589ms X X X
Array 24.3ms 24.0ms 24.7ms 30.2ms 27.3ms 28.4ms 28.5ms 32.1ms
List 1 7.67ms 6.67ms 6.81ms 6.65ms 6.71ms 6.68ms 6.67ms 6.52ms
Queue & List 7.70ms 6.54ms 7.39ms 6.65ms 6.70ms 6.54ms 6.67ms 6.53ms
List 2 6.08ms 6.08ms 6.15ms 6.70ms 6.73ms 6.59ms 7.07ms 6.42ms
Single List 7.99ms 7.64ms 7.65ms 7.57ms 7.57ms 8.49ms 7.87ms 8.01ms
  • Benchmark2
count = 100 500 1000 5000 10000 10^5 10^6 10^7
Skip & Take 1 2.31ms 186ms 1370ms 159076ms X X X X
Skip & Take 2 0.29ms 4.35ms 23.8ms 348ms 1530ms 136629ms X X
Array 0.07ms 0.49ms 0.58ms 3.23ms 6.01ms 60.2ms 582ms 5635ms
List 1 0.11ms 0.50ms 1.01ms 3.50ms 7.00ms 82.6ms 696ms 6763ms
Queue & List 0.33ms 1.36ms 2.70ms 13.5ms 30.9ms 275ms 2718ms 27272ms
List 2 0.09ms 0.40ms 0.77ms 3.93ms 7.88ms 81.1ms 801ms 7397ms
Single List 0.11ms 0.38ms 0.76ms 3.84ms 7.60ms 98.2ms 838ms 8538ms

※ X印はあまりの遅さにベンチマーク完走を断念したことを表します。遅すぎるわ!

Skip & Take その1、走らせてみるとこれはちょっと深刻に遅い・・・。
その2の方はその1に比べればかなりまし。下3つに比べればかなり不満な数字だが、評価タイミング(2)の実装でこれより速いのはちょっと思いつかない。

評価タイミング(1)の実装だとBenchmark1の方ではArrayバッファが若干遅い。
しかし、これらの実装にとってBenchmark1は単にリストの末尾要素にアクセスするだけの操作になっているはず。
詳しくはよくわからないが、IEnumerable<T>としてアクセスする場合にはArrayよりListの方が速いみたいだ。

Benchmark2ではArrayバッファが最速だった。
List 1とList 2の比較を見る限り、先行記事の実装は制御構造的にも速いようだ。

RxのBuffer実装は存外速くない。
Queue & ListとList 2の差が、Rxがオーバーロードのために払っているコストであり、それは決して小さくはないようである。

今回実装したSingle ListはBenchmark1でListを返すものに比べごく僅かに遅い傾向があり、Benchmark2でもListには敵わない。
これは、デコレータを挟んだことでcallvirt命令が一つ余分に入っているせいなのではと思っている。
でも、オーバーロードに対応する前提でいえばQueue & Listの実装よりもずっと速い。

結論

先行記事で紹介されていた実装はさすがの速さだった。
ArrayではIEnumerable<T>としてのアクセスに若干の不利があるようだが、Benchmark1のようなユースケースではそもそも即ToListなりするだろうから問題になりにくいだろう。
評価タイミング(1)の実装においてはBenchmark2の結果のほうが重要だ。
とは言えArrayバッファの実装はオーバーロードへの応用に難があるので、その点では単一Listバッファにもメリットがあるんじゃないかな。

Skip & Take その1の実装は色んなブログで見かけた記憶があるが、これはちょっと一考すべきパフォーマンスの悪さだと思う。
それほどシビアなユースケースでなくとも、可能な限りその2の実装に切り替えたほうがいい。
そのSkip & Takeその2の実装もお世辞には高速と言えないが、分割後の各イテラブルにまで遅延評価がかかる実装はこれしかない。
もしもそういうユースケースがあるなら諦めて使わざるを得ないだろう。(正直そんなケースは思いつかないが)

かなり関係ないところからスタートしてだいぶ遠回りしたが、ライブラリレイヤーではこういった厳密な仕様固めが大事だと思う。
ドキュメントを突き詰めてインターフェースの仕様をしっかり理解することに努めたのはいい経験になったかな。

最後に

普通にコーディングしてる分にはRx使えばいいと思うよ。
ビバ、メジャーライブラリ。



  1. C#では、というよりILではthisは暗黙のarg.0 (0番目の引数)である。 

  2. 純粋でないオペレータを使っていればその限りではないが、そのようなケースは仕様外扱いで構わないと思う。 

  3. Rx版(IObservable)では更に時間幅を引数に取るオーバーロードも存在するが、Ix版(IEnumerable)に同等の機能はない。IEnumerableの出力に時間的分布は考慮されない(もちろん、激遅コルーチンで無理やり時間差を付けることはできるが)ので当然といえば当然である。