3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【UniRx】Buffer(2, 1)よりPairwiseの方が少しお得

Posted at

概要

UniRxには「流れてきたメッセージを溜めて、後でまとめて流す」ためのBufferというオペレータがあります。また、Pairwiseという「流れてきたメッセージを2つまとめて流す」オペレータもあります。
まずこれらそれぞれの動きと違いについて紹介し、Buffer(2,1)に対するPairwiseの優位性について説明します。
ついでに、せっかく内部実装を見るので少し改造したオペレータについても考えてみます。

長々書いてますが、一言で言えばBufferIList<T>で、Pairwiseは構造体Pair<T>というだけの話なので、それで「なるほどね」となる人なら得るものは無いです。

想定する読者層

  • UniRxはわかる、使える
  • オペレータ内部の実装にちょっと興味ある

説明しないこと

  • ReactiveExtensionやUniRxについて
  • Buffer, Pairwise以外のオペレータ

既存のBufferとPairwiseオペレータ

Buffer(個数指定)

「メッセージが指定した数溜まったらまとめて流し、古いメッセージを指定した数捨てる」という動きをするオペレータです。
捨てる数を指定しない場合、流したメッセージは全て捨てます。
雑に1から4のintを流してみます。

using System;
using System.Collections.Generic;
using UnityEngine;
using UniRx;

public class BufferSample : MonoBheaviour
{
    private void Start()
    {
        Observable.Range(1, 4)
            .Buffer(2, 1) // ここでメッセージが溜まる
            .Subscribe(bufferList => 
            {
                Debug.Log(String.Join(", ", bufferList));
            }
}

結果

1, 2
2, 3
3, 4
4

このように、Buffer(2, 1)を指定すると、「2個溜まったら流して、古い1個を捨てる」動きをします。最後に「4」が流れているのはOnComplete時の動作で、BufferOnCompleteで残っている中身を流す特徴があります。

Pairwise

「メッセージが2個溜まったらまとめて流す」という動きをするオペレータです。Bufferが「メッセージが指定した数溜まったらまとめて流す」オペレータなので、ほぼBuffer(2,1)と同じとなります。

using UnityEngine;
using UniRx;

public class PairwiseSample : MonoBheaviour
{
    private void Start()
    {
        Observable.Range(1, 4)
            .Pairwise()
            .Subscribe(pair => 
            {
                Debug.Log($"{pair.Previous}, {pair.Current}");
            }
}

結果

1, 2
2, 3
3, 4

Buffer(2,1)のときと違うのは、OnCompleteで「4」が流れてないところです。ペアで来ないと意味無いならPairwiseを使うのがいいでしょう。

Buffer(外部からメッセージ放流タイミングを指示)

さきほどは「メッセージが2個溜まる」ことがメッセージの放流タイミングでしたが、外部から放流を指示するまでメッセージを溜め続けることもできます。例えばダブルクリック場合は以下のように書くことができます。

using UnityEngine;
using UniRx;
using UniRx.Triggers;

public class DoubleClickSample : MonoBheaviour
{
    private void Start()
    {
        var click = this.UpdateAsObservable()  
            .Where(_ => Input.GetMouseButtonDown (0));

        click
            // 0.5秒経ったら放流。流したら溜まってるバッファを全削除。
            .Buffer(click.Throttle(TimeSpan.FromMilliseconds(500)))
            // クリックメッセージ2回以上溜まっていたらOK
            .Where(tap => tap.Count >= 2)
            .Subscribe(tap => Debug.Log ("だぶるくりっく!"));
}

Bufferの内部実装

Bufferがメッセージとして流す型はILIst<T>です。Bufferは内部にList<T>を持っており、OnNextではそれを流し、その後新しいListを生成しています。そのため、例えば、毎フレームメッセージが1つ流れてきて、それをBuffer(2,1)で受け止める場合を考えると、2フレームに1回Listがnewされることになります。

// Bufferの、溜める個数のみ指定した場合のObserverを一部抜粋
// 指定した個数溜まったら流して、新しいListを生成している。
// 捨てる個数も指定する場合はもうちょっと複雑(Listインスタンスが増える)
public override void OnNext(T value)
{
    list.Add(value);
    if (list.Count == parent.count)
    {
        observer.OnNext(list);
        list = new List<T>(parent.count);
    }
}

Pairwiseの内部実装

Pairwiseが流すメッセージはPair<T>という、T型を2つ持つだけ持つ構造体です。(GetHashCodeとかはちゃんと自前実装してある)
Pairwiseprev(直前のメッセージ)のみを保持して、今回のメッセージと合わせて構造体Pairを生成して流しています。その後、prevに今回のメッセージを上書きします。

// Observer一部抜粋。Pairを生成しているが、これは構造体。ヒープの確保は無い。
public override void OnNext(T value)
{
    if (isFirst)
    {
        isFirst = false;
        prev = value;
        return;
    }

    var pair = new Pair<T>(prev, value);
    prev = value;
    observer.OnNext(pair);
}

つまり、状況に適しているなら、Buffer(2,1)を使うよりPairwiseを使った方がお得そうです。(OnCompleteの違いはある)

スワイプ処理をPairwiseを使って実装してみる

タッチデバイスでスワイプを検知するクラスを、UniRxで実装してみます。
スワイプを、「前フレームのタッチ座標と現フレームのタッチ座標の差で検知する」ように作ります。メッセージが頻繁に来るとわかっており、また前後ペアでメッセージが来ないと意味がないのでBufferではなくPairwiseを選択します。

using UnityEngine;
using UniRx;
using UniRx.Triggers;

public class Swipe : MonoBehaviour
{
    private IObservable<Vector2> _onSwipe;
    public IObservable<Vector2> OnSwipe => _onSwipe ?? (_onSwipe = CreateSwipeStream());

    private IObservable<Vector2> CreateSwipeStream()
    {
        // 指の数が正しくなければ止めるためのストリーム
        var swipeEndStream = this.UpdateAsObservable().Where(_ => !IsCorrectFingerCount());

        return this.UpdateAsObservable()
            .Where(_ => IsCorrectFingerCount()) // 想定したタッチ本数以外なら無視する
            .Select(_ => Input.GetTouch(0)) // 今のタッチ座標を取る
            .Pairwise() // 前回の座標とペアにする
            .Select(positionPair => positionPair.Current - positionPair.Previous) // 前フレームとの差分を取る
            .TakeUntile(swipeEndStream) // 指本数が1本でなくなったらComplete
            .RepeatUntileDestroy(this); // 改めてSubscribe
            
        // 想定した指本数か
        bool IsCorrectFingerCount() => Input.touchCount == 1;
    }
}

これで、外部からはOnSwipeSubscribeすれば毎フレームのスワイプ距離を購読できます。
以上が、Buffer(2,1)よりPairwiseがいい、という話でした。
以降は主題から外れ、BufferPairwiseを改造する話です。

内部のバッファを削除できる改造Pairwiseを作ってみる

スワイプ処理でTakeUntil + RepeatUntileDestroyしている理由はPairwiseが保持しているバッファを削除したいからです。(スワイプが終わって、またスワイプを始めたときに、前回の終了位置と比較されては困るため。)ですが、TakeUntil + RepeatUntileDestroyだと、OnCompleteを発行して且つ改めてSubscribeすることでそれを実現しており、余計な処理が多い感じがします。処理的にも、読みやすさ的にも、素直に「Pairwiseの中にあるバッファを消す」と書けると嬉しいです。
しかしPairwiseにはそのようなオーバーロードはありません。が、UniRxはオペレータのベースクラスを公開しており、割と簡単にオペレータを自作できます。というわけで自作します。

内部のバッファを削除できる(ように見える)Pairwise

using System;
using UniRx;
using UniRx.Operators;

public static class PairwiseWithClearBuffer
{
    public static IObservable<Pair<TSource>> Pairwise<TSource, TClearMessage>(
        this IObservable<TSource> source,
        IObservable<TCrearMessage> clearMessages)
    {
        return new PairwiseObservable<TSource, TClearMessage>(source, clearMessages);
    }
}

public class PairwiseObservable<T, TClearMessage> : OperatorObservableBase<Pair<T>>
{
    private readonly IObservable<T> _source;
    private readonly IObservable<TClearMessage> _clearMessages;

    public PairwiseObservable(IObservable<T> source, IObservable<TClearMessage> clearMessages)
        : base(source.IsRequiredSubscribeOnCurrentThread())
    {
        _source = source;
        _clearMessages = clearMessages;
    }

    protected override IDisposable SubscribeCore(IObserver<Pair<T>> observer, IDisposable cancel)
    {
        return _source.Subscribe(new Pairwise(this, observer, cancel));
    }

    private class Pairwise : OperatorObserverBase<T, Pair<T>>
    {
        private T _prev;
        private bool _isFirst = true;

        public Pairwise(PairwiseObservable<T, TClearMessage> parent, IObserver<Pair<T>> observer, IDisposable cancel) : base(observer, cancel)
        {
            // バッファ削除指示が来たらisFirst扱いにする(なので実際には削除ではない)
            parent._clearMessages.Subscribe(_ => _isFirst = true);
        }

        public override void OnNext(T value)
        {
            if (_isFirst)
            {
                _isFirst = false;
                _prev = value;
                return;
            }

            var pair = new Pair<T>(prev, value);
            _prev = value;
            observer.OnNext(pair);
        }
            
        public override void OnError(Exception error)
        {
            try { observer.OnError(error); } finally { Dispose(); }
        }

        public override void OnCompleted()
        {
            try { observer.OnCompleted(); } finally { Dispose(); }
        }
    }
}

ほぼ既存のPairwiseのコピペで、_isFirstを書き換える指示を出せるようにしただけです。IObservebleによってその指示出しを行っている部分は、「放流タイミングを外部から指示できるBuffer」の実装に倣っています。

その他改善点

  • 1本指スワイプだけじゃなくて、複数指スワイプも取得したい。(1本指だけなら普通にUpdateで書けばいい気がするので、なんか差別化を図りたかった)

改善したスワイプ処理

public class Swipe : MonoBehaviour
{
    // n本指スワイプストリームの辞書(Key: 指本数 n, Value: ストリーム)
    private readonly Dictionary<int, IObservable<Vector2>> swipeStreamDic = new Dictionary<int, IObservable<Vector2>>();

    public IObservable<Vector2> GetSwipeStream(int fingerCount)
    {
        if (swipeStreamDic.TryGetValue(fingerCount, out var swipeStream)) { return swipeStream; }

        swipeStream = CreateSwipeStream(fingerCount);
        swipeStreamDic.Add(fingerCount, swipeStream);
        return swipeStream;
    }

    private IObservable<Vector2> CreateSwipeStream(int fingerCount)
    {
        // 指の数が正しくなければ一旦区切るためのストリーム
        var swipeBoundaryStream = this.UpdateAsObservable().Where(_ => !IsCorrectFingerCount());

        return this.UpdateAsObservable()
            .Where(_ => IsCorrectFingerCount()) // 想定した指本数以外なら無視する
            .Select(_ => CalculateCurrentPosition()) // 今のタッチ座標を取る
            .Pairwise(swipeBoundaryStream) // 前回の座標とペアにするが、指の本数が正しくなくなったらリセットする
            .Select(positionPair => positionPair.Current - positionPair.Previous) // 前フレームとの差分を取る
            .Share(); // Hot変換も一応
            
        // 想定した指本数か
        bool IsCorrectFingerCount() => Input.touchCount == fingerCount;

        // 現座標を計算する
        Vector2 CalculateCurrentPosition()
        {
            // 操作中は毎フレーム走る処理なのでfor文。全指タッチ座標の重心取ってる。
            var sum = Vector2.zero;
            for (var fingerIndex = 0; fingerIndex < fingerCount; fingerIndex++)
            {
                sum += Input.GetTouch(fingerIndex).position;
            }
            return sum / fingerCount;
        }
    }
}

GetSwipeStream(int fingerCount)の戻り値をSubscribeすれば、指定した指本数のスワイプを購読できます。

Bufferオペレータの改造について

既存のBufferオペレータの「毎回Listインスタンスを生成している」実装は悪いのか

全くそんなことはありません。むしろ、Bufferオペレータはこの実装でなければ困ります。

Listを改めて生成しない場合

OnNextListをnewせず、既存のListインスタンスの中身を書き換えて使いまわせば、最初だけGCが発生して、以降は避けられます。
個数指定の場合はキャパシティも指定できるのでAddでの内部配列の再配置も起こりません。
ですが、これは問題があります。
ご存じのように、Listは参照型です。以下のように、流れてきたメッセージを溜めこんでおきたい場合を考えます。

using System;
using System.Collections.Generic;
using UnityEngine;
using UniRx;

public class BufferTest : MonoBehaviour
{
    // 流れてきたメッセージを溜めこむ
    private List<IList<int>> _messageCache = new List<IList<int>>();

    private void Start()
    {
        Observable.Range(1, 4)
            .Buffer(2, 1)
            .Subscribe(_messageCache.Add);
        
        foreach(var message in _messageCache)
        {
            Debug.Log(String.Join(", ", message));
        }
    }
}

既存のBufferの場合

1,2
2,3
3,4
4

ListをnewしないBufferを自作して置き換えた場合
OnNextで流した後にインスタンスをキャッシュし使いまわす)


4

4

結果の詳細はどうでもいいです(実装に左右されるので)。大事なのは、当然ながら期待した結果にならないという点。

ライブラリの実装のあるべき形

ここで言いたいのは、既存のBufferの実装は

  • Bufferはメッセージを溜め込む、流す数が任意
  • 流したメッセージがどう使われるかわからない
  • Listは参照型である

ために、必要があってListを毎回生成しているということです。
「使用方法によっては期待した動作をしない」「普通に使うために内部実装を確認する必要がある」のはライブラリとしては厳しいです。よほどパフォーマンスが悪いなら考えものですが、Bufferは別にそこまでじゃないです(ダブルクリックくらいの頻度なら気にすることではない)。ちょっとしたパフォーマンス改善のために副作用が生じるのは優先順位を間違えているので、拡張はユーザーに任せるか、別の、詳細を調べるような人しか見ないメソッドを用意してあげるのが正しい姿かなと思います。

Listを毎回生成しないBufferオペレータを自作する場合

参照をキャッシュされると困るという動作上の副作用がある以上、Bufferみたいなシンプルな名前にはしない方がいいでしょう。
BufferRecycleMessageInstanceとか、副作用の部分を説明する語を追記して使用者に伝わる状態にすれば許容範囲かなと思います。(ネーミングセンスのNASA)

終わり

TakeUntil + RepeatUntileDestroyが嫌なのでPairwiseを自作したり、Bufferも、使い方によっては毎回のList生成を避ける手もあると説明しましたが、どうでしょうか。良さげだと思った人もいれば、「割に合わなくね?」と思った人もいるんじゃないかと思います。
恐らく本記事でハッキリ言えるのは

  • Pairwiseを使える状況ならBufferよりPairwiseが少しお得

くらいで、オペレータの自作の部分に関しては好みや開発の環境と相談して決めることかなと。
そもそも、UniRxが無いと綺麗な設計にならないなんてことは全く無いので、「オペレータの自作」が何らかの問題の解決手段として上位には来ないと思います。

参考

Buffer, Pairwiseの使用例はこちらの書籍のものがシンプルでわかりやすいなと思ったのでこちらからお借りしました。(他のと混ざってるのでちょっと違うけど、基本書籍です)

使う側の都合をよく考えるとかそのへんの意見は、自分の経験もありますが、こちらの影響があるかなと思います。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?