Switchについて
参考文献では以下のように説明されています.
Switchは購読の対象とする Observableを切り替えることができるOperatorです.入力として複数のObservableを与えることができ、そのうちの最新のObservableについてのみ購読できるようになります。
SwitchはObservableが与えられたとき、すでに購読しているObservableがある場合はそれをキャンセルし新しいObservableへと購読を切り替えます.そしてそのObservableから発行されたメッセージをすべて後続へと素通しします.
つまりSwitchを用いることで、1つのObservableを維持したまま購読する対象を次へ次へと切り替えていくことができるようになります.
「マーブルダイアグラム」では以下のように表わされます.
出典: ReactiveX Documentation
この IObservable<IObservable<T>>
に対して購読する対象を切り替えていくということに具体的なイメージを持てていなかったため、上記の文献で紹介されているものも含めて使用例をいくつか調べてみました.
Switchの使用例
- 選択中のキャラクターの攻撃力
- 最後に接触したオブジェクトの追跡
- 入力に応じたリソース読み込み (インクリメンタルサーチ)
1.選択中のキャラクターの攻撃力
IObservable<IObservable<T>>
の一番分かりやすい例は入れ子になった ReactiveProperty
かと思います.ここでは、攻撃力を持つActorと、選択中のアクターを管理するActorSelectionControllerを考えます.
public class Actor {
public ReactiveProperty<int> AttackRP { get; } = new(0);
}
public class ActorSelectionController {
public ReactiveProperty<Actor > CurrentActorRP { get; } = new(null);
}
以下のコードでは、現在選択されているアクターのAttackRPをリアルタイムで監視し、値が更新された際にログを出力します。
public class TestMono_01 : MonoBehaviour {
private ActorSelectionController _actorSelectionController = new();
private void Start() {
_actorSelectionController.CurrentActorRP
.Where(actor => actor != null) // Null防止
.Select(actor => actor.AttackRP)
// 現在のActorのAttackRPだけを監視
.Switch()
.Subscribe(attack => Debug.Log($"Actor's Attack : {attack}"))
.AddTo(this);
// サンプルシナリオ
SimulateActorSwitching();
}
private void SimulateActorSwitching() {
// Actorの生成
var actor1 = new Actor();
var actor2 = new Actor();
// Actor1を選択
_actorSelectionController.CurrentActorRP.Value = actor1;
// Actor1のAttackRPを変更
actor1.AttackRP.Value = 1;
actor1.AttackRP.Value = 2;
actor1.AttackRP.Value = 3;
// Actor2を選択
_actorSelectionController.CurrentActorRP.Value = actor2;
// Actor2のAttackRPを変更
actor2.AttackRP.Value = 10;
actor2.AttackRP.Value = 20;
actor2.AttackRP.Value = 30;
// 再びActor1を選択
_actorSelectionController.CurrentActorRP.Value = actor1;
// Actor1のAttackRPを変更
actor1.AttackRP.Value = 5;
actor1.AttackRP.Value = 6;
actor1.AttackRP.Value = 7;
}
}
【実行結果】
Actor's Attack : 0 ← 切り替え (nullからActor1)
Actor's Attack : 1
Actor's Attack : 2
Actor's Attack : 3
Actor's Attack : 0 ← 切り替え (Actor1からActor2)
Actor's Attack : 10
Actor's Attack : 20
Actor's Attack : 30
Actor's Attack : 3 ← 切り替え (Actor2からActor1)
Actor's Attack : 5
Actor's Attack : 6
Actor's Attack : 7
注意点として AttackRP
への代入時に加えて CurrentActorRP
の切り替え時にも発行されています.これは前述の「購読する対象を切り替えていく」タイミングで発火されたもののため,AttackRPの値に関係なく(※Actor1,2のAttackRPが同じ値の場合でも)流れます.
SelectManayの場合
ちなみに今回の例で Select
+ Switch
の代わりに SelectManay
を用いた場合,古いObservableがキャンセルされないため,Actor1に再び切り替えた後は2回分の処理が実行されています.
_actorSelectionController.CurrentActorRP
.Where(actor => actor != null)
// 変更箇所
.SelectMany(actor => actor.AttackRP)
//.Select(actor => actor.AttackRP)
//.Switch()
.Subscribe(attack => Debug.Log($"Actor's Attack : {attack}"))
.AddTo(this);
【実行結果】
Actor's Attack : 0 ← 切り替え (nullからActor1)
Actor's Attack : 1
Actor's Attack : 2
Actor's Attack : 3
Actor's Attack : 0 ← 切り替え (Actor1からActor2)
Actor's Attack : 10
Actor's Attack : 20
Actor's Attack : 30
Actor's Attack : 3 ← 切り替え (Actor2からActor1)
Actor's Attack : 5 ← ※以下は2回づつ処理が走っている
Actor's Attack : 5 ←
Actor's Attack : 6 ←
Actor's Attack : 6 ←
Actor's Attack : 7 ←
Actor's Attack : 7 ←
2. 最後に接触した対象の追跡
こちらは Observable<T1>
のT1から新しい Observable<T2>
を作っていくパターンです.
以下では IObservable<Collider>
に対してColliderから対象の座標を毎フレーム通知する IObservable<Vector3>
を生成しています.これに Switch
オペレータ を適用して最新のもののみ監視を行います.
public class TestMono_02 : MonoBehaviour {
private void Start() {
var targetObservable =
// IObservable<Collider>型
this.OnCollisionEnterAsObservable()
// IObservable<IObservable<Vector3>>型
// (※ColliderからIObservable<Vector3>を作っている)
.Select(collider => CreatePositionObservable(collider.gameObject));
targetObservable
// 最後に接触したColliderのみを監視
.Switch()
.Subscribe(targetPos => {
// 対象を追跡
var newPosition = Vector3.Lerp(transform.position, targetPos, Time.deltaTime);
transform.position = newPosition;
})
.AddTo(this);
}
//
private IObservable<Vector3> CreatePositionObservable(GameObject target) {
return target.UpdateAsObservable() // 毎フレーム監視
.Select(_ => target.transform.position);
}
}
3. 入力に応じたリソース読み込み (インクリメンタルサーチ)
先ほどはある型 T
から Observable
を直接作成していましたが,こちらは T
を入力として非同期処理を実行してそれを Observable
に変換するパターンです.
例ではInputField.OnValueChangedAsObservable
で入力値(リソースキー)の変化を検知して,非同期でスプライトを読み込みます.
// スプライト読み込み
private async UniTask<Sprite> LoadSpriteAsync(string resourcePath, CancellationToken token = default) {
var sprite = await Resources.LoadAsync<Sprite>(resourcePath) as Sprite;
// 2秒程かかると仮定
await UniTask.WaitForSeconds(2f, cancellationToken: token);
return sprite;
}
3-1. UniTask.ToObservable()
UniTaskには Observable
へ変換するための拡張メソッドが用意されています.
public class ObservableSwitchTest : MonoBehaviour {
[SerializeField] private TextMeshProUGUI _logText = null;
[SerializeField] private TMP_InputField _inputField = null;
[SerializeField] private Image _image = null;
private void Start() {
_inputField.OnValueChangedAsObservable()
// 値が変化するたびに読み込み処理を非同期実行
.Select(path => LoadSpriteAsync(path).ToObservable())
// 最新のIObservableに切り替える
.Switch()
.Subscribe(sprite => _image.sprite = sprite)
.AddTo(this);
}
}
Switch
オペレータによって IObservable<IObservable<T>>
の最新のものに対してのみ処理が実行できています (古い Observable
は Dispose されている). しかし、実行した非同期処理 (UniTask) は裏で走り続けてしまっています.
UniTask.ToObservable()
ではCancellationを渡せないため,そのままでは
またSwitchオペレータに限らず,UniRxで非同期コールバックを扱う時にはそれらをどのように扱うかは頻出する問題のようです.余談ですがこの辺のRxとasync/awaitの連携がR3では改善されているようです.(まだR3は触れていませんが)
https://developer.aiming-inc.com/csharp/unity-csharp-async-callback-patterns/
https://developer.aiming-inc.com/csharp/post-10773/
3-2. ObservableConverter.FromAsync()を実装して用いる
とりあえず,この問題へ対処するために以下の記事を参考にさせていただきました.記事ではパフォーマンスを考慮して「UniRxパッケージにメソッドを追加する方法」も示されていますが,少しハードルが高いため簡単な「staticメソッドを実装する方法」の方を採用しました.
以下は実装した ObservableConverter.FromUniTask()
を用いた場合の実行結果です.
private void Start() {
// ObservableConverter.FromUniTaskを使用する形に変更
_inputField.OnValueChangedAsObservable()
// 非同期読み込み処理を実行 (※CancellationDisposableを返す)
.Select(path => ObservableConverter.FromUniTask(ct => LoadSpriteAsync(path, ct)))
//.Select(path => LoadSpriteAsync(path).ToObservable())
// 最新のIObservableに切り替える
.Switch()
.Subscribe(sprite => _image.sprite = sprite)
.AddTo(this);
}
最新の Observable
の非同期処理のみ実行されていることが確認できました.
【参考】