概要
UniRxには「流れてきたメッセージを溜めて、後でまとめて流す」ためのBuffer
というオペレータがあります。また、Pairwise
という「流れてきたメッセージを2つまとめて流す」オペレータもあります。
まずこれらそれぞれの動きと違いについて紹介し、Buffer(2,1)
に対するPairwise
の優位性について説明します。
ついでに、せっかく内部実装を見るので少し改造したオペレータについても考えてみます。
長々書いてますが、一言で言えばBuffer
はIList<T>
で、Pairwise
は構造体Pair<T>
というだけの話なので、それで「なるほどね」となる人なら得るものは無いです。
想定する読者層
- UniRxはわかる、使える
- オペレータ内部の実装にちょっと興味ある
説明しないこと
- ReactiveExtensionやUniRxについて
- Buffer, Pairwise以外のオペレータ
既存のBufferとPairwiseオペレータ
Buffer(個数指定)
「メッセージが指定した数溜まったらまとめて流し、古いメッセージを指定した数捨てる」という動きをするオペレータです。
捨てる数を指定しない場合、流したメッセージは全て捨てます。
雑に1から4のintを流してみます。
using System;
using System.Collections.Generic;
using UnityEngine;
using UniRx;
public class BufferSample : MonoBheaviour
{
private void Start()
{
Observable.Range(1, 4)
.Buffer(2, 1) // ここでメッセージが溜まる
.Subscribe(bufferList =>
{
Debug.Log(String.Join(", ", bufferList));
}
}
結果
1, 2
2, 3
3, 4
4
このように、Buffer(2, 1)
を指定すると、「2個溜まったら流して、古い1個を捨てる」動きをします。最後に「4」が流れているのはOnComplete
時の動作で、Buffer
はOnComplete
で残っている中身を流す特徴があります。
Pairwise
「メッセージが2個溜まったらまとめて流す」という動きをするオペレータです。Buffer
が「メッセージが指定した数溜まったらまとめて流す」オペレータなので、ほぼBuffer(2,1)
と同じとなります。
using UnityEngine;
using UniRx;
public class PairwiseSample : MonoBheaviour
{
private void Start()
{
Observable.Range(1, 4)
.Pairwise()
.Subscribe(pair =>
{
Debug.Log($"{pair.Previous}, {pair.Current}");
}
}
結果
1, 2
2, 3
3, 4
Buffer(2,1)
のときと違うのは、OnComplete
で「4」が流れてないところです。ペアで来ないと意味無いならPairwise
を使うのがいいでしょう。
Buffer(外部からメッセージ放流タイミングを指示)
さきほどは「メッセージが2個溜まる」ことがメッセージの放流タイミングでしたが、外部から放流を指示するまでメッセージを溜め続けることもできます。例えばダブルクリック場合は以下のように書くことができます。
using UnityEngine;
using UniRx;
using UniRx.Triggers;
public class DoubleClickSample : MonoBheaviour
{
private void Start()
{
var click = this.UpdateAsObservable()
.Where(_ => Input.GetMouseButtonDown (0));
click
// 0.5秒経ったら放流。流したら溜まってるバッファを全削除。
.Buffer(click.Throttle(TimeSpan.FromMilliseconds(500)))
// クリックメッセージ2回以上溜まっていたらOK
.Where(tap => tap.Count >= 2)
.Subscribe(tap => Debug.Log ("だぶるくりっく!"));
}
Bufferの内部実装
Buffer
がメッセージとして流す型はILIst<T>
です。Buffer
は内部にList<T>
を持っており、OnNext
ではそれを流し、その後新しいList
を生成しています。そのため、例えば、毎フレームメッセージが1つ流れてきて、それをBuffer(2,1)
で受け止める場合を考えると、2フレームに1回List
がnewされることになります。
// Bufferの、溜める個数のみ指定した場合のObserverを一部抜粋
// 指定した個数溜まったら流して、新しいListを生成している。
// 捨てる個数も指定する場合はもうちょっと複雑(Listインスタンスが増える)
public override void OnNext(T value)
{
list.Add(value);
if (list.Count == parent.count)
{
observer.OnNext(list);
list = new List<T>(parent.count);
}
}
Pairwiseの内部実装
Pairwise
が流すメッセージはPair<T>
という、T型を2つ持つだけ持つ構造体です。(GetHashCode
とかはちゃんと自前実装してある)
Pairwise
はprev
(直前のメッセージ)のみを保持して、今回のメッセージと合わせて構造体Pair
を生成して流しています。その後、prev
に今回のメッセージを上書きします。
// Observer一部抜粋。Pairを生成しているが、これは構造体。ヒープの確保は無い。
public override void OnNext(T value)
{
if (isFirst)
{
isFirst = false;
prev = value;
return;
}
var pair = new Pair<T>(prev, value);
prev = value;
observer.OnNext(pair);
}
つまり、状況に適しているなら、Buffer(2,1)
を使うよりPairwise
を使った方がお得そうです。(OnCompleteの違いはある)
スワイプ処理をPairwiseを使って実装してみる
タッチデバイスでスワイプを検知するクラスを、UniRxで実装してみます。
スワイプを、「前フレームのタッチ座標と現フレームのタッチ座標の差で検知する」ように作ります。メッセージが頻繁に来るとわかっており、また前後ペアでメッセージが来ないと意味がないのでBuffer
ではなくPairwise
を選択します。
using UnityEngine;
using UniRx;
using UniRx.Triggers;
public class Swipe : MonoBehaviour
{
private IObservable<Vector2> _onSwipe;
public IObservable<Vector2> OnSwipe => _onSwipe ?? (_onSwipe = CreateSwipeStream());
private IObservable<Vector2> CreateSwipeStream()
{
// 指の数が正しくなければ止めるためのストリーム
var swipeEndStream = this.UpdateAsObservable().Where(_ => !IsCorrectFingerCount());
return this.UpdateAsObservable()
.Where(_ => IsCorrectFingerCount()) // 想定したタッチ本数以外なら無視する
.Select(_ => Input.GetTouch(0)) // 今のタッチ座標を取る
.Pairwise() // 前回の座標とペアにする
.Select(positionPair => positionPair.Current - positionPair.Previous) // 前フレームとの差分を取る
.TakeUntile(swipeEndStream) // 指本数が1本でなくなったらComplete
.RepeatUntileDestroy(this); // 改めてSubscribe
// 想定した指本数か
bool IsCorrectFingerCount() => Input.touchCount == 1;
}
}
これで、外部からはOnSwipe
をSubscribe
すれば毎フレームのスワイプ距離を購読できます。
以上が、Buffer(2,1)
よりPairwise
がいい、という話でした。
以降は主題から外れ、Buffer
やPairwise
を改造する話です。
内部のバッファを削除できる改造Pairwiseを作ってみる
スワイプ処理でTakeUntil + RepeatUntileDestroy
している理由はPairwise
が保持しているバッファを削除したいからです。(スワイプが終わって、またスワイプを始めたときに、前回の終了位置と比較されては困るため。)ですが、TakeUntil + RepeatUntileDestroy
だと、OnComplete
を発行して且つ改めてSubscribe
することでそれを実現しており、余計な処理が多い感じがします。処理的にも、読みやすさ的にも、素直に「Pairwise
の中にあるバッファを消す」と書けると嬉しいです。
しかしPairwise
にはそのようなオーバーロードはありません。が、UniRxはオペレータのベースクラスを公開しており、割と簡単にオペレータを自作できます。というわけで自作します。
内部のバッファを削除できる(ように見える)Pairwise
using System;
using UniRx;
using UniRx.Operators;
public static class PairwiseWithClearBuffer
{
public static IObservable<Pair<TSource>> Pairwise<TSource, TClearMessage>(
this IObservable<TSource> source,
IObservable<TCrearMessage> clearMessages)
{
return new PairwiseObservable<TSource, TClearMessage>(source, clearMessages);
}
}
public class PairwiseObservable<T, TClearMessage> : OperatorObservableBase<Pair<T>>
{
private readonly IObservable<T> _source;
private readonly IObservable<TClearMessage> _clearMessages;
public PairwiseObservable(IObservable<T> source, IObservable<TClearMessage> clearMessages)
: base(source.IsRequiredSubscribeOnCurrentThread())
{
_source = source;
_clearMessages = clearMessages;
}
protected override IDisposable SubscribeCore(IObserver<Pair<T>> observer, IDisposable cancel)
{
return _source.Subscribe(new Pairwise(this, observer, cancel));
}
private class Pairwise : OperatorObserverBase<T, Pair<T>>
{
private T _prev;
private bool _isFirst = true;
public Pairwise(PairwiseObservable<T, TClearMessage> parent, IObserver<Pair<T>> observer, IDisposable cancel) : base(observer, cancel)
{
// バッファ削除指示が来たらisFirst扱いにする(なので実際には削除ではない)
parent._clearMessages.Subscribe(_ => _isFirst = true);
}
public override void OnNext(T value)
{
if (_isFirst)
{
_isFirst = false;
_prev = value;
return;
}
var pair = new Pair<T>(prev, value);
_prev = value;
observer.OnNext(pair);
}
public override void OnError(Exception error)
{
try { observer.OnError(error); } finally { Dispose(); }
}
public override void OnCompleted()
{
try { observer.OnCompleted(); } finally { Dispose(); }
}
}
}
ほぼ既存のPairwise
のコピペで、_isFirst
を書き換える指示を出せるようにしただけです。IObserveble
によってその指示出しを行っている部分は、「放流タイミングを外部から指示できるBuffer
」の実装に倣っています。
その他改善点
- 1本指スワイプだけじゃなくて、複数指スワイプも取得したい。(1本指だけなら普通にUpdateで書けばいい気がするので、なんか差別化を図りたかった)
改善したスワイプ処理
public class Swipe : MonoBehaviour
{
// n本指スワイプストリームの辞書(Key: 指本数 n, Value: ストリーム)
private readonly Dictionary<int, IObservable<Vector2>> swipeStreamDic = new Dictionary<int, IObservable<Vector2>>();
public IObservable<Vector2> GetSwipeStream(int fingerCount)
{
if (swipeStreamDic.TryGetValue(fingerCount, out var swipeStream)) { return swipeStream; }
swipeStream = CreateSwipeStream(fingerCount);
swipeStreamDic.Add(fingerCount, swipeStream);
return swipeStream;
}
private IObservable<Vector2> CreateSwipeStream(int fingerCount)
{
// 指の数が正しくなければ一旦区切るためのストリーム
var swipeBoundaryStream = this.UpdateAsObservable().Where(_ => !IsCorrectFingerCount());
return this.UpdateAsObservable()
.Where(_ => IsCorrectFingerCount()) // 想定した指本数以外なら無視する
.Select(_ => CalculateCurrentPosition()) // 今のタッチ座標を取る
.Pairwise(swipeBoundaryStream) // 前回の座標とペアにするが、指の本数が正しくなくなったらリセットする
.Select(positionPair => positionPair.Current - positionPair.Previous) // 前フレームとの差分を取る
.Share(); // Hot変換も一応
// 想定した指本数か
bool IsCorrectFingerCount() => Input.touchCount == fingerCount;
// 現座標を計算する
Vector2 CalculateCurrentPosition()
{
// 操作中は毎フレーム走る処理なのでfor文。全指タッチ座標の重心取ってる。
var sum = Vector2.zero;
for (var fingerIndex = 0; fingerIndex < fingerCount; fingerIndex++)
{
sum += Input.GetTouch(fingerIndex).position;
}
return sum / fingerCount;
}
}
}
GetSwipeStream(int fingerCount)
の戻り値をSubscribe
すれば、指定した指本数のスワイプを購読できます。
Bufferオペレータの改造について
既存のBufferオペレータの「毎回Listインスタンスを生成している」実装は悪いのか
全くそんなことはありません。むしろ、Buffer
オペレータはこの実装でなければ困ります。
Listを改めて生成しない場合
OnNext
でList
をnewせず、既存のList
インスタンスの中身を書き換えて使いまわせば、最初だけGCが発生して、以降は避けられます。
個数指定の場合はキャパシティも指定できるのでAdd
での内部配列の再配置も起こりません。
ですが、これは問題があります。
ご存じのように、List
は参照型です。以下のように、流れてきたメッセージを溜めこんでおきたい場合を考えます。
using System;
using System.Collections.Generic;
using UnityEngine;
using UniRx;
public class BufferTest : MonoBehaviour
{
// 流れてきたメッセージを溜めこむ
private List<IList<int>> _messageCache = new List<IList<int>>();
private void Start()
{
Observable.Range(1, 4)
.Buffer(2, 1)
.Subscribe(_messageCache.Add);
foreach(var message in _messageCache)
{
Debug.Log(String.Join(", ", message));
}
}
}
既存のBufferの場合
1,2
2,3
3,4
4
List
をnewしないBuffer
を自作して置き換えた場合
(OnNext
で流した後にインスタンスをキャッシュし使いまわす)
4
4
結果の詳細はどうでもいいです(実装に左右されるので)。大事なのは、当然ながら期待した結果にならないという点。
ライブラリの実装のあるべき形
ここで言いたいのは、既存のBuffer
の実装は
-
Buffer
はメッセージを溜め込む、流す数が任意 - 流したメッセージがどう使われるかわからない
-
List
は参照型である
ために、必要があってList
を毎回生成しているということです。
「使用方法によっては期待した動作をしない」「普通に使うために内部実装を確認する必要がある」のはライブラリとしては厳しいです。よほどパフォーマンスが悪いなら考えものですが、Buffer
は別にそこまでじゃないです(ダブルクリックくらいの頻度なら気にすることではない)。ちょっとしたパフォーマンス改善のために副作用が生じるのは優先順位を間違えているので、拡張はユーザーに任せるか、別の、詳細を調べるような人しか見ないメソッドを用意してあげるのが正しい姿かなと思います。
Listを毎回生成しないBufferオペレータを自作する場合
参照をキャッシュされると困るという動作上の副作用がある以上、Buffer
みたいなシンプルな名前にはしない方がいいでしょう。
BufferRecycleMessageInstance
とか、副作用の部分を説明する語を追記して使用者に伝わる状態にすれば許容範囲かなと思います。(ネーミングセンスのNASA)
終わり
TakeUntil + RepeatUntileDestroy
が嫌なのでPairwise
を自作したり、Buffer
も、使い方によっては毎回のList
生成を避ける手もあると説明しましたが、どうでしょうか。良さげだと思った人もいれば、「割に合わなくね?」と思った人もいるんじゃないかと思います。
恐らく本記事でハッキリ言えるのは
- Pairwiseを使える状況ならBufferよりPairwiseが少しお得
くらいで、オペレータの自作の部分に関しては好みや開発の環境と相談して決めることかなと。
そもそも、UniRxが無いと綺麗な設計にならないなんてことは全く無いので、「オペレータの自作」が何らかの問題の解決手段として上位には来ないと思います。
参考
Buffer
, Pairwise
の使用例はこちらの書籍のものがシンプルでわかりやすいなと思ったのでこちらからお借りしました。(他のと混ざってるのでちょっと違うけど、基本書籍です)
使う側の都合をよく考えるとかそのへんの意見は、自分の経験もありますが、こちらの影響があるかなと思います。