【UniRx】ストリームのメッセージ同士を比較する

More than 1 year has passed since last update.

UniRxについての記事のまとめはこちら


UniRxというよりは、本家Reactive Extensionsの内容です。

UniRxで実装を進めていく上で、「あるストリームを流れるメッセージ同士を比較したい」といった事をやりたくなることがあります。直前のフレームでのプレイヤ座標と今フレームでのプレイヤの座標を比較したい、みたいな。実はこういった処理はBufferオペレータを使うと簡潔に書くことができます。

というわけで今回はBufferのSkipの機能と、それを使った実装例を紹介したいと思います。

Bufferのオーバーロード

Bufferオペレータにはオーバーロードが幾つかあるのですが、そのうちの1つにSkip数を指定できるものがあります。Skipを指定することで、Bufferが放流したあとどのタイミングで再度放流するかの指定を行うことができます。Skipを指定しない場合、Bufferに指定したCountと同じ値がSkipとして設定されます。


Skipを指定しない場合
Observable.Range(1, 10)
    .Select(x=>x.ToString())
    .Buffer(2) //2個ずつまとめる(放流したあとに2個飛ばしてから次を放流する。Buffer(2,2)と同義)
    .Subscribe(x =>
    {
        //Bufferの内容を表示
        Debug.Log(x.Aggregate<string>((p, c) => p.ToString() + "," + c.ToString()));
    });
結果
1,2
3,4
5,6
7,8
9,10

Skipを指定した場合
Observable.Range(1, 10)
    .Select(x=>x.ToString())
    .Buffer(2,1) //2個ずつまとめる。放流後1個飛ばして放流する
    .Subscribe(x =>
    {
        //Bufferの内容を表示
        Debug.Log(x.Aggregate<string>((p, c) => p.ToString() + "," + c.ToString()));
    });
結果
1,2
2,3
3,4
4,5
5,6
6,7
7,8
9,10
10

Skipを指定した場合2
Observable.Range(1, 10)
    .Select(x=>x.ToString())
    .Buffer(3,2) //3個ずつまとめる。放流後2個飛ばして放流する
    .Subscribe(x =>
    {
        //Bufferの内容を表示
        Debug.Log(x.Aggregate<string>((p, c) => p.ToString() + "," + c.ToString()));
    });
結果
1,2,3
3,4,5
5,6,7
7,8,9
9,10

このBufferのSkipを利用することで、簡単にストリーム中のメッセージの比較や演算ができるようになります。

例)Bufferを使って直前のメッセージの差分を取る

Buffer(2,1)を使うと簡単に実現できます。(ZipとSkip(1)でゴリ押し実装している人をよく見かけますが、Bufferの方が素直に書けます)
ただし、OnCompletedが発行された際にBufferは値が揃ってなくても放出してしまう特性があるため、Whereで必要数揃ってない場合はカットするなどの処理を入れてあげる必要があります。

transform.Positionの差分を取る
this.UpdateAsObservable()
    .Select(_ => this.transform.position)
    .Buffer(2, 1)
    .Where(x => x.Count == 2) //OnCompleted時に1個だけ流れてくるのをカット
    .Select(x => x.Last() - x.First())
    .Subscribe(x => Debug.Log("Delta:" + x));

例)Bufferで過去nメッセージの値から平均値を算出する

Buffer(n,1)にLINQのAverageを組み合わせれば簡単に実現できます。

過去10フレームでのTime.deltaTimeの平均値をリアルタイムに算出する
this.UpdateAsObservable()
    .Select(_ => Time.deltaTime)
    .Buffer(10, 1)
    .Select(x => x.Average())
    .Subscribe(x => Debug.Log("Average:" + x));

応用:FPSカウンタを実装する

まとめ

Bufferはただ値を塞き止めるだけのオペレータではありません。この様にSkipを指定できたり、時間で区切ってまとめたりと汎用性が高いオペレータだったりします。

Bufferに限った話ではなく、同じオペレータでもオーバーロードで全然挙動が違ったりするものが多くあります。無理くりオペレータチェーンでロジックを実現する前に、似たオペレータが無いか探しそのオーバーロードを確認してみると良いかと思います。

追記

Buffer(2,1)と似た挙動をするPairwiseというオペレータもあります。
OnCompletedが発行された時の挙動が若干違う以外は同じ動作をするので、直前の値だけを使いたいならこちらを使うのも良いかもしれません。

Pairwise
//Pairwise()
Observable.Range(1, 10)
    .Pairwise()
    .Subscribe(x =>
        Debug.Log(string.Format("{0},{1}", x.Previous, x.Current))
    );

//Buffer(2,1)
Observable.Range(1, 10)
    .Select(x=>x.ToString())
    .Buffer(2,1)
    .Subscribe(x =>
    {
        //Bufferの内容を表示
        Debug.Log(x.Aggregate<string>((p, c) => p.ToString() + "," + c.ToString()));
    });
結果
(Pairwise)
1,2
2,3
3,4
4,5
5,6
6,7
7,8
9,10

(Buffer)
1,2
2,3
3,4
4,5
5,6
6,7
7,8
9,10
10 //←
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.