前提
「R3」とは、ReactiveExtensionsの最新の環境のC#に合わせて再構築したライブラリです。Unityでは「UniRx」というライブラリがありましたが、大雑把にいえば「UniRxを最新の環境にあわせてリメイク」したものという認識でよいでしょう。
詳しくは別記事でまとめてあります。
またこの記事執筆時点での環境は次のとおりです。
- Unity - 2023.1.14f1
- R3 - 1.0.0
- ObservableCollections - 2.0.1
- NuGetForUnity - 4.0.2
今回の概要
「UniRx」と「R3」の機能の比較、R3での新機能や廃止された機能、UniRxからR3に置き換えるときの代替などについて紹介します。(細かい部分まですべては拾いきれないので、紹介漏れはご容赦ください。またUnity向けでない機能などは省略しています)
この記事中に登場するサンプルコードは別途記載がない限りはCC0です。自由にコピペして使ってください。
(ただし発生したトラブルや問題については責任を負いません)
またサンプルプロジェクトはGitHubにて公開しています。
動作環境などの違い
最低Unityバージョン
UniRx
UniRxでは最低バージョンは特に存在せず、Unity 2017頃のかなり古いUnityバージョンでも動作します。
R3
R3では最低でもUnity 2021.3以上である必要があります。
補足: destroyCancellationToken
R3ではキャンセルをCancellationToken
によって管理する仕組みになっています。
Unity 2022.2以降であればMonoBehaviour
上でdestroyCancellationTokenが利用できるため、こちらを使うのが便利です。
Unity 2022.2未満の場合は代わりにR3が提供するObservableDestroyTrigger
を使うことで同等の機能が得られます。
(Unity2022.2以降でObservableDestroyTrigger
を使っても問題はありません)
using System;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
public class DestroySample : MonoBehaviour
{
private void Start()
{
// Unity 2022.2以降ならdestroyCancellationTokenが使える
Observable
.Timer(TimeSpan.FromSeconds(3), destroyCancellationToken)
.Subscribe();
// destroyCancellationTokenが使えない場合はGetCancellationTokenOnDestroyを代わりに使えばOK
// (Unity2022.2以降なら内部的にdestroyCancellationTokenを返すだけなのでノーコスト)
Observable
.Timer(TimeSpan.FromSeconds(3), this.GetCancellationTokenOnDestroy())
.Subscribe();
}
}
導入方法
UniRx
UniRx次のいずれかの方法で導入する必要があります。
- unitypackage
- UPM(Git)
- UPM(OpenUPM)
R3
R3では「コアモジュール」と「Unityプラグイン」の2つに別れており、Unityでフル機能を動作させる場合はこの両者をインストールする必要があります。R3の公式ドキュメントに両者のインストール方法が記載されています。
コアモジュールはNuget経由でのインストールが必要なため、NugetForUnity
を使う方法が推奨されます。Unity用プラグイン(R3.Unity
)はGit経由でUnityPackageManagerからインストールしてください。
詳しくは公式ドキュメントを参照してください。
なおasmdefでモジュール管理をしている場合は、R3.Unity
を参照に追加してください。
根本的な挙動の違い
R3ではObservable
の概念が根本から見直されているため、UniRxと挙動が大きく異なります。
-
OnError
メッセージがOnErrorResume
メッセージに変更された -
OnCompleted
メッセージ発行時に「正常終了」か「異常終了(例外込み)」かを選べるようになった - すべての
Observable
はOnCompleted
メッセージを最後に発行できるようになった -
Scheduler
が廃止された -
async/await
との連携がやりやすくなった -
CancellationToken
で制御しやすくなった
こちらについては別の記事ですでに解説済みですので次の記事を参照してください。
大きな変更点はREADMEに書いてある
R3のREADMEに差分が書いてあるので、まずはそちらを読みましょう。
UniRxからR3に移行しても(ほぼ)そのまま使える機能
UniRxでよく使われていた機能はR3にも存在します。そのため次の機能についてはR3に以降してもほぼ同じように使えます。
Trigger(MonoBehaviourのイベント変換)
UniRxに存在したMonoBehaviourのイベントをObservable
に変換する機能(Trigger
)ですが、R3でも利用可能です。
using R3;
using R3.Triggers;
using UnityEngine;
namespace Samples.R3Sample
{
public class TriggerSample : MonoBehaviour
{
private void Start()
{
// このGameObjectに紐づいたOnCollisionEnterをObservableとして取得できる
this.OnCollisionEnterAsObservable()
.Subscribe(collision =>
{
Debug.Log("OnCollisionEnter: " + collision.gameObject.name);
});
// Update()をObservableとして取得できる
this.UpdateAsObservable()
.Subscribe(_ =>
{
Debug.Log("Update!");
});
// 他にもいろいろある
}
}
}
AddTo(MonoBehaviour)
MonoBehaviour
の寿命にIDisposable
を連動させるAddTo(this)
ですが、R3でも使えます。
using R3;
using R3.Triggers;
using UnityEngine;
namespace Samples.R3Sample
{
public class AddToSample : MonoBehaviour
{
[SerializeField] private GameObject _childObject;
private void Start()
{
// childObjectに紐づいたOnCollisionEnterをObservableとして取得
_childObject
.OnCollisionEnterAsObservable()
.Subscribe(collision =>
{
Debug.Log("OnCollisionEnter: " + collision.gameObject.name);
})
// Observableの寿命をこのMonoBehaviourに紐付ける
.AddTo(this);
}
}
}
uGUIコンポーネントのイベント変換
UnityEngine.UI.Button
などのいわゆる「uGUI」のイベントをObservable
に変換する機能はUniRxから引き続き利用可能です。
どのようなイベントが利用可能かはこちらを参照してください。
using R3;
using UnityEngine;
using UnityEngine.UI;
namespace Samples.R3Sample
{
public class GuiEventSample : MonoBehaviour
{
[SerializeField] private Button _button;
[SerializeField] private InputField _inputField;
[SerializeField] private Slider _slider;
[SerializeField] private Text _text;
private void Start()
{
// ボタンのクリック
_button
.OnClickAsObservable()
.Subscribe(_ => Debug.Log("Button Clicked!"))
.AddTo(this);
// InputFieldのテキスト変更
_inputField.OnValueChangedAsObservable()
.Subscribe(txt => Debug.Log("InputField Text: " + txt))
.AddTo(this);
// Sliderの値変更
_slider.OnValueChangedAsObservable()
.Subscribe(v => Debug.Log("Slider Value: " + v))
.AddTo(this);
// InputFieldのテキストをTextに反映
_inputField.OnValueChangedAsObservable()
.SubscribeToText(_text)
.AddTo(this);
}
}
}
R3での新機能(UniRxには無かった機能)
[新機能] SubscribeAwait/SelectAwait/WhereAwait
次世代Rx「R3」解説でも解説しましたが、Subscribe
やSelect
/Where
でasync/await
を併用できる版が追加されました。R3の場合はasync/await
の完了とメッセージ処理をいい感じに制御してくれます。(UniTaskと組み合わせるとさらに便利!)
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;
namespace Samples.R3Sample
{
public class SubscribeAwaitSample1 : MonoBehaviour
{
[SerializeField] private Button _goButton;
private void Start()
{
// ボタンが押されたら1秒間前進する
// 連打された場合はその数だけ進む
_goButton.OnClickAsObservable()
.SubscribeAwait(async (_, ct) =>
{
var time = Time.time;
while (Time.time - time < 1f)
{
transform.position += Vector3.forward * Time.deltaTime;
await UniTask.Yield(ct);
}
},
AwaitOperation.Sequential,
// configureAwaitはtrueから変更しないことを推奨
configureAwait: true)
.AddTo(this);
}
}
}
なお、configureAwaitはTrueを指定しましょう。(デフォルトTrueです)
Falseを指定した場合、実行コンテキストが意図せずにスレッドプールへ切り替わってしまう場合があります。
また、AwaitOperation
というパラメータを指定することで非同期処理の実行中(await
の処理が終わる前)に次のメッセージが到達してしまったときの挙動を調整することができます。
AwaitOperation |
await 中に次のイベントが来たときの挙動 |
備考 |
---|---|---|
Sequential | 今実行中の処理を優先。余剰なイベントはキューに積む。非同期処理が終わり次第、次の1つを取り出して順番に非同期実行する。 | |
Drop | 今実行中の処理を優先。余剰なイベントは無視してなかったことにする。 | |
Switch | 今実行中の非同期処理をキャンセル。 新しく到達したイベントの処理を優先して開始する。 | キャンセル処理はCancellationToken を使って自分で実装する必要がある。 |
Parallel | 新しく来たイベントを即座に処理する。処理が終わったものから早いもの勝ちで出力される。 |
maxConcurrent で同時実行数を制限できる。maxConcurrent を超える数のメッセージはキューに積まれる。 |
SequentialParallel※ | 新しく来たイベントを即座に処理する。処理の終了順によらず、出力順が入力順と同じになるように順序調整される。 |
maxConcurrent で同時実行数を制限できる。maxConcurrent を超える数のメッセージはキューに積まれる。 |
ThrottleFirstLast | 非同期処理が実行されていないとき、新しく届いた値を処理する。非同期処理の実行中は最新の値を1つだけ保持し、非同期処理の終了時にそれを取り出して処理を行う。 |
ThrottleFirst とThrottleLast が合体した挙動 |
※ SequentialParallelはWhereAwait/SelectAwaitでのみ利用可
(補足)UniRxだとどういう挙動をしていたか
UniRxだとどういう挙動をしていたか
[新機能] Debounce/ThrottleFirst/ThrottleLastの非同期対応
Debounce
(旧名Throttle
)/ThrottleFirst
/ThrottleLast
(旧名Sample
)はUniRxにもあったオペレータですが、R3では非同期処理に対応しました。つまりasync/await
と併用ができます。
それぞれの非同期版の挙動は次のとおりです。
-
Debounce
:メッセージが到達したら非同期処理を実行。非同期処理が完遂したらそのメッセージを発行する。非同期処理中に次のメッセージが来た場合は実行中に非同期処理をキャンセルして再び非同期処理を実行しなおす。 -
ThrottleFirst
:メッセージが到達したらそれを通過させた後に非同期処理を実行、その処理が終わるまでメッセージを遮断する -
ThrottleLast
:メッセージが到達したら非同期処理を実行してメッセージを遮断、その処理が終わった時最後に届いていたメッセージを1つだけ発行する」
Debounce
とThrottleLast
の違いは非同期処理をやり直すか完遂するかの違いです。Debounce
はメッセージが来るたびに非同期処理をやり直し。ThrottleLast
は一度走り始めたらそれが完遂するまでやりきります。
Debounceの例
Debounceを使ったサンプル
using System.Threading;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;
namespace Samples.R3Sample
{
public class DebounceSample : MonoBehaviour
{
[SerializeField] private Button _button;
[SerializeField] private Text _buttonText;
[SerializeField] private Text _outputText;
[SerializeField] private Slider _processSlider;
private readonly ReactiveProperty<int> _currentValue = new();
private void Start()
{
_currentValue.AddTo(this);
// ボタンが押されたらカウンタを更新
_button.OnClickAsObservable()
.Subscribe(_ => _currentValue.Value++)
.AddTo(this);
// カウンタの数値をボタンに反映
_currentValue.SubscribeToText(_buttonText).AddTo(this);
// カウンタが増加したらオペレータを通してOutputのテキストに出力
_currentValue
.Skip(1)
// Debounceで遮断
.Debounce((_, ct) => UpdateSliderAsync(1f, ct))
.SubscribeToText(_outputText)
.AddTo(this);
}
// 一定時間待機する(その状況をスライダーに反映)
private async UniTask UpdateSliderAsync(float waitSeconds, CancellationToken ct)
{
_processSlider.value = 0;
// 合計で1秒待機する
var currentTime = 0f;
while (!ct.IsCancellationRequested && currentTime < waitSeconds)
{
await UniTask.Yield();
currentTime += Time.deltaTime;
// 経過状況をスライダーに反映
_processSlider.value = Mathf.Clamp01(currentTime / waitSeconds);
}
}
}
}
ThrottleFirstの例
ThrottleFirstを使ったサンプル
using System.Threading;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;
namespace Samples.R3Sample
{
public class ThrottleFirstSample : MonoBehaviour
{
[SerializeField] private Button _button;
[SerializeField] private Text _buttonText;
[SerializeField] private Text _outputText;
[SerializeField] private Slider _processSlider;
private readonly ReactiveProperty<int> _currentValue = new();
private void Start()
{
_currentValue.AddTo(this);
// ボタンが押されたらカウンタを更新
_button.OnClickAsObservable()
.Subscribe(_ => _currentValue.Value++)
.AddTo(this);
// カウンタの数値をボタンに反映
_currentValue.SubscribeToText(_buttonText).AddTo(this);
// カウンタが増加したらオペレータを通してOutputのテキストに出力
_currentValue
.Skip(1)
// ThrottleFirstで遮断
.ThrottleFirst((_, ct) => UpdateSliderAsync(1f, ct))
.SubscribeToText(_outputText)
.AddTo(this);
}
// 一定時間待機する(その状況をスライダーに反映)
private async UniTask UpdateSliderAsync(float waitSeconds, CancellationToken ct)
{
_processSlider.value = 0;
// 合計で1秒待機する
var currentTime = 0f;
while (!ct.IsCancellationRequested && currentTime < waitSeconds)
{
await UniTask.Yield();
currentTime += Time.deltaTime;
// 経過状況をスライダーに反映
_processSlider.value = Mathf.Clamp01(currentTime / waitSeconds);
}
}
}
}
ThrottleLastの例
ThrottleLastを使ったサンプル
using System.Threading;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;
namespace Samples.R3Sample
{
public class ThrottleLastSample : MonoBehaviour
{
[SerializeField] private Button _button;
[SerializeField] private Text _buttonText;
[SerializeField] private Text _outputText;
[SerializeField] private Slider _processSlider;
private readonly ReactiveProperty<int> _currentValue = new();
private void Start()
{
_currentValue.AddTo(this);
// ボタンが押されたらカウンタを更新
_button.OnClickAsObservable()
.Subscribe(_ => _currentValue.Value++)
.AddTo(this);
// カウンタの数値をボタンに反映
_currentValue.SubscribeToText(_buttonText).AddTo(this);
// カウンタが増加したらオペレータを通してOutputのテキストに出力
_currentValue
.Skip(1)
// ThrottleLastで遮断
.ThrottleLast((_, ct) => UpdateSliderAsync(1f, ct))
.SubscribeToText(_outputText)
.AddTo(this);
}
// 一定時間待機する(その状況をスライダーに反映)
private async UniTask UpdateSliderAsync(float waitSeconds, CancellationToken ct)
{
_processSlider.value = 0;
// 合計で1秒待機する
var currentTime = 0f;
while (!ct.IsCancellationRequested && currentTime < waitSeconds)
{
await UniTask.Yield();
currentTime += Time.deltaTime;
// 経過状況をスライダーに反映
_processSlider.value = Mathf.Clamp01(currentTime / waitSeconds);
}
}
}
}
[新機能] IDisposable.RegisterTo(CancellationToken)
IDisposable
に対する拡張メソッドとしてRegisterTo
が定義されています。これにより指定のCancellationToken
にIDisposable.Dispose()
を連動させることができます。
機能自体はUniTaskにも存在したIDisposable.AddTo(CancellationToken)
と同じです。(名前がUniTaskとR3で衝突したのでRegisterTo
に変えた、とのこと)
using R3;
using R3.Triggers;
using UnityEngine;
namespace Samples.R3Sample
{
public class RegisterToSample : MonoBehaviour
{
private void Start()
{
// このObservableの寿命をCancellationTokenに連動させる
this.UpdateAsObservable()
.Subscribe(_ => Debug.Log("Update!"))
.RegisterTo(destroyCancellationToken);
}
}
}
[新機能] ReplayFrameSubject
指定した期間分だけ発行されたメッセージをキャッシュしてくれるReplaySubject
のフレーム指定版が登場しました。後述するFrameProvider
と組み合わせることで「過去一定フレーム以内に発行されたメッセージをすべてキャッシュする」といった使い方ができます。メッセージを発行するタイミングとSubscribe()
をするタイミングがズレている場合などに使うと便利です。
// 例:過去FixedUpdate()10フレーム分の期間に発行されたメッセージをすべてキャッシュするSubject
var replayFrameSubject = new ReplayFrameSubject<Unit>(window: 10, UnityFrameProvider.FixedUpdate);
[新機能] Observable Tracker
Observable Trackerは購読中のObservable
をUnityEditor上で可視化できるツールです。控えめに言ってもすごくよいです。Observable
は購読の解除漏れがあったときそれに気づきにくいのが問題でしたが、Observable Trackerを使うことで解決します。
どのObservable
がどのスタックで動いており、いつから稼働しているかなどを一覧で可視化することができます。
[新機能] SerializableReactiveProperty<T>
SerializableReactiveProperty<T>
を使うことで、ReactiveProperty
をそのままUnityのインスペクターウィンドウに表示できるようになりました。UniRxでは任意の型を表示したいときにEditor拡張を用意する必要がありましたが、R3ではジェネリックをそのまま使うことができます。
public class NewBehaviourScript : MonoBehaviour
{
public SerializableReactiveProperty<int> rpInt;
public SerializableReactiveProperty<long> rpLong;
public SerializableReactiveProperty<byte> rpByte;
public SerializableReactiveProperty<float> rpFloat;
public SerializableReactiveProperty<double> rpDouble;
public SerializableReactiveProperty<string> rpString;
public SerializableReactiveProperty<bool> rpBool;
public SerializableReactiveProperty<Vector2> rpVector2;
public SerializableReactiveProperty<Vector2Int> rpVector2Int;
public SerializableReactiveProperty<Vector3> rpVector3;
public SerializableReactiveProperty<Vector3Int> rpVector3Int;
public SerializableReactiveProperty<Vector4> rpVector4;
public SerializableReactiveProperty<Color> rpColor;
public SerializableReactiveProperty<Rect> rpRect;
public SerializableReactiveProperty<Bounds> rpBounds;
public SerializableReactiveProperty<BoundsInt> rpBoundsInt;
public SerializableReactiveProperty<Quaternion> rpQuaternion;
public SerializableReactiveProperty<Matrix4x4> rpMatrix4x4;
public SerializableReactiveProperty<FruitEnum> rpEnum;
public SerializableReactiveProperty<FruitFlagsEnum> rpFlagsEnum;
}
[新機能] LiveList
R3にはLiveList<T>
というObservable<T>
から変換可能なコレクションが用意されています。LiveList<T>
は「Observable<T>
を購読し、発行されたメッセージを自動的にリストに追加する」という挙動をします。
テスト時などに活用すると便利です。
using System;
using NUnit.Framework;
using R3;
namespace Samples.R3Tests
{
public class LiveListSample
{
[Test]
public void LiveListが便利()
{
using var subject = new Subject<int>();
// Observable -> LiveList
using var liveList = subject.ToLiveList();
// 現時点で出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
// 「1」を発行
subject.OnNext(1);
// 発行された「1」が反映されている
CollectionAssert.AreEqual(new[] { 1 }, liveList);
subject.OnNext(2);
subject.OnNext(3);
// 3つの値が反映されている
CollectionAssert.AreEqual(new[] { 1, 2, 3 }, liveList);
}
}
}
なおLiveList
をDispose()
するとObservable
の購読も同時に終了します。
[新機能] Observableのユニットテストサポート(FakeFrameProvider)
話が前後してしまいますが、R3ではScheduler
の概念が廃止され代わりにTimeProvider
/FrameProvider
により時間の制御が行われています。そのうちFrameProvider
のデバッグ用実装がFakeFrameProvider
です。これを用いることでObservable
のテストが書きやすくなります。
なお、TimeProvider
のデバッグ用実装であるFakeTimeProvider
はMicrosoft.Extensions.Time.Testing
パッケージとして公開されており、こちらは別途Nuget経由で導入する必要があります。
メモ:FakeTimeProviderの導入方法
FakeTimeProviderの導入方法
FakeTimeProvider
を使いたい場合はMicrosoft.Extensions.TimeProvider.Testing
パッケージを導入する必要があります。
ただこちらのパッケージですが、NugetForUnity
から導入するとMicrosoft.Bcl.AsyncInterfaces
とUnityが干渉してうまく導入できないことがあります。
そのため現時点ではNuGetのWebページから.NET Framework 4.6.2
向けのdllを手動ででDLして直接プロジェクトに入れてしまう方法が推奨です。
テスト例
下準備としてテストアセンブリに
- R3.Unity
- R3.dll
- Microsoft.Bcl.TimeProvider.dll
- Microsoft.Extensions.TimeProvider.Testing.dll
を登録してください。
その上で、次のようなテストを書くことでTimeProvider
/FrameProvider
を差し替えたテストができます。
using System;
using R3;
namespace Samples.R3Sample
{
// テスト対象
public class TestTargetObject : IDisposable
{
private readonly Subject<int> _subject = new();
// Publish()した値を一定時間後に出力するだけのObservable
public Observable<int> OutputDelayFrame => _subject.DelayFrame(30);
public Observable<int> OutputDelay => _subject.Delay(TimeSpan.FromSeconds(3));
public void Publish(int value)
{
_subject.OnNext(value);
}
public void Dispose()
{
_subject.Dispose();
}
}
}
using System;
using Microsoft.Extensions.Time.Testing;
using NUnit.Framework;
using Samples.R3Sample;
using R3;
namespace Samples.R3Tests
{
public class TestR3Observable
{
private FakeFrameProvider _fakeFrameProvider;
private FakeTimeProvider _fakeTimeProvider;
[SetUp]
public void SetUp()
{
// SetUpでデフォルトのFrameProviderを差し替える
_fakeFrameProvider = new FakeFrameProvider();
_fakeTimeProvider = new FakeTimeProvider();
ObservableSystem.DefaultFrameProvider = _fakeFrameProvider;
ObservableSystem.DefaultTimeProvider = _fakeTimeProvider;
}
[Test]
public void OutputDelayFrameのテスト()
{
using var target = new TestTargetObject();
// LiveListに変換
using var liveList = target.OutputDelayFrame.ToLiveList();
// 現時点で出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
target.Publish(1);
target.Publish(2);
target.Publish(3);
// 30フレーム経過するまで出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
// 29フレーム経過させる
_fakeFrameProvider.Advance(29);
// まだ出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
// さらに1フレーム経過させる
_fakeFrameProvider.Advance(1);
// 30フレーム経過したので出力されているはず
CollectionAssert.AreEqual(new[] { 1, 2, 3 }, liveList);
}
[Test]
public void OutputDelayのテスト()
{
using var target = new TestTargetObject();
// LiveListに変換
using var liveList = target.OutputDelay.ToLiveList();
// 現時点で出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
target.Publish(1);
target.Publish(2);
target.Publish(3);
// 3秒経過するまで出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
// 2秒進める経過させる
_fakeTimeProvider.Advance(TimeSpan.FromSeconds(2));
// まだ出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
// さらに1秒進める
_fakeTimeProvider.Advance(TimeSpan.FromSeconds(1));
// 計3秒経ったので値が発行された
CollectionAssert.AreEqual(new[] { 1, 2, 3 }, liveList);
}
}
}
[新機能] ObservableSystem
R3では全体のObservable
の挙動を設定するObservableSystem
という機能が追加されています。
// Observable内で発行された例外が処理されなかったときに最終的に到達するハンドラを登録できる
ObservableSystem.RegisterUnhandledExceptionHandler(ex => Debug.LogException(ex));
// デフォルトのTimeProvider/FrameProviderを指定できる(後述)
ObservableSystem.DefaultTimeProvider = UnityTimeProvider.Update;
ObservableSystem.DefaultFrameProvider = UnityFrameProvider.Update;
R3で廃止/変更された機能(UniRxからの移行時に代替が必要なもの)
[変更] Schedulerが廃止
Scheduler
とはObservable
における「時間」「タイミング」「実行コンテキスト」を制御するための機構です。R3ではこのScheduler
は廃止されており、代わりにTimeProvider
およびFrameProvider
で制御される仕組みとなっています。
(TimeProvider
は.NET 8
で追加された「時間」を抽象化するための機構です。.NET 8
といいつつも単品パッケージとして公開されており、Nugetから導入することで2024年現在のUnityでも利用することができます)
「R3.Unity」を導入している場合はUnityTimeProvider
/UnityFrameProvider
という実装が追加されます。これを指定することでUnityの挙動に合わせた時間管理をR3で行うことができます。
TimeProvider/FrameProviderの指定
Observable
定義時にTimeProvider
/FrameProvider
を引数で指定できるものがあります。
Observable
ごとに挙動を変更したい場合は指定してください。
using System;
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class TimeProviderSample : MonoBehaviour
{
private void Start()
{
// Observable.EveryUpdateは一定のフレーム間隔でメッセージを発行する
// どのフレームタイミングでメッセージ発行するかを指定できる
Observable
.EveryUpdate(UnityFrameProvider.FixedUpdate, destroyCancellationToken)
.Subscribe(_ =>
{
// FixedUpdateと同じタイミングで実行される
});
// Observable.Timerは指定した時間が経過したらメッセージを発行する
// 時間の計測をUpdate()のタイミングで実施する
Observable
.Timer(TimeSpan.FromSeconds(1),
UnityTimeProvider.Update,
destroyCancellationToken)
.Subscribe(_ =>
{
// 1秒後に実行される
});
}
}
}
とくにUnityTimeProvider
ですが、「Time.scale
の影響を受けるもの」「Time.scale
の影響を受けないもの」「Unityの時間とは独立して時間計測するもの」の3パターンの指定ができます。
用途によって使い分けましょう。
Observable
.Timer(TimeSpan.FromSeconds(1),
// Time.scaleの影響を受ける
// Unityが動作を停止していた場合は時間が進まない
UnityTimeProvider.Update,
destroyCancellationToken)
.Subscribe(_ =>
{
// 1秒後に実行される
});
Observable
.Timer(TimeSpan.FromSeconds(1),
// Time.scaleの影響を受けないが、
// Unityが動作を停止していた場合は時間が進まない
UnityTimeProvider.UpdateIgnoreTimeScale,
destroyCancellationToken)
.Subscribe(_ =>
{
// 1秒後に実行される
});
Observable
.Timer(TimeSpan.FromSeconds(1),
// Unityの挙動とは独立した時間計測
// Unityが動作を停止していた場合でも時間が進む
UnityTimeProvider.UpdateRealtime,
destroyCancellationToken)
.Subscribe(_ =>
{
// 1秒後に実行される
});
デフォルトの指定
ObservableSystem.DefaultTimeProvider
およびObservableSystem.DefaultFrameProvider
を設定することで、グローバルで用いるProvider
のデフォルト値を変更することができます。
// デフォルトは両者ともに「Update」
ObservableSystem.DefaultTimeProvider = UnityTimeProvider.Update;
ObservableSystem.DefaultFrameProvider = UnityFrameProvider.Update;
R3.Unityでは起動時に自動的にUpdate
が指定されるようになっています。変更したい場合は手動で上書きしてください。
[変更] CurrentThreadSchedulerが使えない
前述の廃止されたScheduler
のひとつにCurrentThreadScheduler
というものがありました。
このCurrentThreadScheduler
は「Observable
中でメッセージ発行が再帰したときに、メッセージ順序を末尾再帰に変換してくれる」という性質がありました。
この「再帰したときに末尾再帰へ変換する」という挙動をR3で再現したい場合はTrampoline()
というオペレータを使ってください。
再帰したときにメッセージ順序が変になる例
再帰したときにメッセージ順序が変になる例
Observable
のメッセージが再帰する例として「オンラインゲームでのプレイヤの参加通知」というものを挙げます。
- プレイヤーが新たに接続してきたときに
PlayerJoined
通知を発行する -
PlayerJoined
通知を受けて、ゲームのマネージャがそのプレイヤーをゲームに参加させる - プレイヤーがゲームに正常に参加したときに、
PlayerAddedToTeam
通知を発行する
という挙動を考えてみます。このときに発行されるイベントは「PlayerJoined
→ PlayerAddedToTeam
」という順序であってほしいです。(PlayerAddedToTeam
イベントが先に発行されてしまうと処理が壊れる可能性がある)
ですが、これをそのまま実装してしまうとメッセージ順序の入れ替わりが発生してしまいます。
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class PlayerEventSample : MonoBehaviour
{
// ゲーム中で扱うイベント
private enum GameEvent
{
PlayerJoined,
PlayerAddedToTeam
}
// ゲームイベントを通知するためのSubject
private readonly Subject<GameEvent> _subject = new();
private void Start()
{
_subject.AddTo(this);
// イベント通知をいろんな場所で購読して処理している(というイメージ)
SubscriberA(_subject);
SubscriberB(_subject);
SubscriberC(_subject);
// 新しくプレイヤーが接続してきたことを通知する
_subject.OnNext(GameEvent.PlayerJoined);
}
// 購読者A
// PlayerJoinedイベントを受け取ったら、ゲームに参加させる処理を行う
private void SubscriberA(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"A: {e}");
// PlayerJoinedイベントを受け取ったら、ゲームに参加させる処理を行う
// その後、PlayerAddedToTeamイベントを通知する(というイメージ)
if (e == GameEvent.PlayerJoined)
{
_subject.OnNext(GameEvent.PlayerAddedToTeam);
}
}).RegisterTo(destroyCancellationToken);
}
// 購読者B
// イベント通知をログに出す
private void SubscriberB(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"B: {e}");
}).RegisterTo(destroyCancellationToken);
}
// 購読者C
// イベント通知をログに出す
private void SubscriberC(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"C: {e}");
}).RegisterTo(destroyCancellationToken);
}
}
}
A: PlayerJoined
A: PlayerAddedToTeam
B: PlayerAddedToTeam
C: PlayerAddedToTeam
B: PlayerJoined
C: PlayerJoined
「SubscriberA」は意図したとおりの順序になっていますが、BとCでは後から発行したPlayerAddedToTeam
が先に到達してしまいました。この順序の入れ替わりはAのSubscribe()
内で同じObservable
に対してメッセージ発行をしてしまった(再帰した)ために起きました。
この再帰時の順序の入れ替わりを防止し、「先に発行したメッセージは先に到達する」という状況(末尾再帰)に変換できるのがTrampoline
オペレータです。(UniRxではObserveOn(Scheduler.CurrentThread)
を使うことで順序の入れ替わりを防ぐことができた)
Trampolineで末尾再帰にする
Trampolineで末尾再帰にする
Trampoline()
を挟むことで、再帰したときに末尾再帰な形へとメッセージ順序が調整されます。なおShare()
はHot変換用のオペレータで、Trampoline
を通した1本のストリームを全員で共有させるために必要となります。
参考資料 : 【Reactive Extensions】 Hot変換はどういう時に必要なのか?
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class TrampolineSample : MonoBehaviour
{
// ゲーム中で扱うイベント
private enum GameEvent
{
PlayerJoined,
PlayerAddedToTeam
}
// ゲームイベントを通知するためのSubject
private readonly Subject<GameEvent> _subject = new();
private void Start()
{
_subject.AddTo(this);
// Trampoline()で再帰したときに末尾再帰になる
// Share()はTrampoline()を通した同じストリームを全員で共有するために必要
var observable = _subject.Trampoline().Share();
// イベント通知をいろんな場所で購読して処理している(というイメージ)
SubscriberA(observable);
SubscriberB(observable);
SubscriberC(observable);
// 新しくプレイヤーが接続してきたことを通知する
_subject.OnNext(GameEvent.PlayerJoined);
}
// 購読者A
// PlayerJoinedイベントを受け取ったら、ゲームに参加させる処理を行う
private void SubscriberA(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"A: {e}");
// PlayerJoinedイベントを受け取ったら、ゲームに参加させる処理を行う
// その後、PlayerAddedToTeamイベントを通知する(というイメージ)
if (e == GameEvent.PlayerJoined)
{
_subject.OnNext(GameEvent.PlayerAddedToTeam);
}
}).RegisterTo(destroyCancellationToken);
}
// 購読者B
// イベント通知をログに出す
private void SubscriberB(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"B: {e}");
}).RegisterTo(destroyCancellationToken);
}
// 購読者C
// イベント通知をログに出す
private void SubscriberC(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"C: {e}");
}).RegisterTo(destroyCancellationToken);
}
}
}
A: PlayerJoined
B: PlayerJoined
C: PlayerJoined
A: PlayerAddedToTeam
B: PlayerAddedToTeam
C: PlayerAddedToTeam
メッセージ順序が「先に発行した順」へと調整されました。
[変更] 例外のハンドリングが変更
UniRxのObservable
では例外発生時はOnError
メッセージが発行されObservable
は停止していました。
しかしR3では例外発生時の挙動が変更されており、発生した例外はOnErrorResume
メッセージとして扱われることになっています。このOnErrorResume
メッセージは「例外を通知するがObservable
は停止せずにそのまま動作を継続する」という挙動をします。
using var subject = new Subject<string>();
// 文字列をintに変換する
subject
.Select(int.Parse) // パース失敗時に例外が発生する
.Subscribe(
x => Debug.Log(x),
ex => Debug.LogError($"OnErrorResume: {ex}"),
result => Debug.Log($"OnCompleted: {result}"));
subject.OnNext("123"); // 出力は OnNext(123)
subject.OnNext("xyz"); // 出力は OnErrorResume: System.FormatException
subject.OnCompleted(); // 出力は OnCompleted: Success
UniRxで必要だった「エラー発生時はRetry()
などのオペレータを使ってObservable
の再構築をする」という処理が不要となりました。
そのためR3ではRetry()
オペレータは廃止されています。
OnErrorResumeAsFailure()
で停止させる
もしOnErrorResume
メッセージ発生時に従来どおりObservable
を停止させたい場合はOnErrorResumeAsFailure()
オペレータを使いましょう。
subject
// パース失敗時に OnErrorResume が発行される
.Select(int.Parse)
// OnErrorResume を OnCompleted(Exception) に変換する
.OnErrorResumeAsFailure()
.Subscribe(
x => Debug.Log(x),
ex => Debug.LogError($"OnErrorResume: {ex}"),
result => Debug.Log($"OnCompleted: {result}"));
Catch()
の挙動
例外発生時のハンドリングを行うことができるCatch()
オペレータですが、こちらが発火する対象はOnCompleted(Exception)
になっています。
// 文字列をintに変換する
subject
// パース失敗時に OnErrorResume が発行される
.Select(int.Parse)
// OnErrorResume を OnCompleted(Exception) に変換する
.OnErrorResumeAsFailure()
// CatchはOnCompleted(Exception)に反応する
.Catch<int, FormatException>(ex =>
{
Debug.LogError(ex);
// 例外発生時に差し替えるObservable
return Observable.Empty<int>();
})
.Subscribe(
x => Debug.Log(x),
ex => Debug.LogError($"OnErrorResume: {ex}"),
result => Debug.Log($"OnCompleted: {result}"));
subject.OnNext("123"); // 出力は OnNext(123)
subject.OnNext("xyz"); // 出力は OnCompleted: Success
subject.OnCompleted(); // 到達しない
[変更] ObservableのAwaiterが無い
UniRxではObservable
を直接async/await
で待つことができましたがR3ではなくなりました。
同等の機能を再現したい場合はLastAsync()
を使ってください。
var result = await Observable.Range(1, 10).LastAsync();
[変更] First/FirstOrDefault/Last/LastOrDefaultが無い
オペレータであるFirst
/FirstOrDefault
/Last
/LastOrDefault
/Single
/SingleOrDefault
がなくなりました。
代わりにそれぞれ-Async
がついた、Task<T>
化する機能が追加されています。
private async UniTaskVoid ExampleAsync(CancellationToken ct)
{
// 最後の値を待機する
var lastValue = await Observable.Range(1, 10).LastAsync(cancellationToken: ct);
// 最初の値を待機する
var firstValue = await Observable.Range(1, 10).FirstAsync(cancellationToken: ct);
}
もしオペレータとしての動作を期待する場合はTake(1)
/TakeLast(1)
を使いましょう。
// Firstとだいたい一緒(ただしEmptyの時にエラーにはならない)
Observable.Range(1, 10)
.Take(1)
.Subscribe(x => Debug.Log(x));
// Lastとだいたい一緒(ただしEmptyの時にエラーにはならない)
Observable.Range(1, 10)
.TakeLast(1)
.Subscribe(x => Debug.Log(x));
// FirstOrDefaultとだいたい一緒
Observable.Empty<int>()
.Take(1).DefaultIfEmpty()
.Subscribe(x => Debug.Log(x));
// LastOrDefaultとだいたい一緒
Observable.Empty<int>()
.TakeLast(1).DefaultIfEmpty()
.Subscribe(x => Debug.Log(x));
// オペレータとしてのSingleは再現できないかも
(余談ですが、-Async
なメソッドは他にも結構追加されています。MaxAsync()
とかAllAsync()
とかContainsAsync()
とか)
[変更] BehaviorSubject/AsyncSubjectが無い
BehaviorSubject
とAsyncSubject
はR3では実装されていません。
BehaviorSubject
は挙動がReactiveProperty
とほぼ同じなため、ReactiveProperty
を代替として使いましょう。
AsyncSubject
についてはUniTaskCompletionSource
がほぼ同じ挙動をするのでUniTaskを導入してこちらを使うとよいかも。
using System;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class AlternativeAsyncSubject : MonoBehaviour
{
private void Start()
{
// UniTaskCompletionSourceをAsyncSubjectの代わりに使ってみる
var utcs = new UniTaskCompletionSource<int>();
// OnNext(100) + OnCompleted() と同じ
utcs.TrySetResult(100);
// OnError(new Exception()) と同じ
utcs.TrySetException(new Exception());
// OnError(new OperationCanceledException()) とだいたい同じ
utcs.TrySetCanceled();
// -----------------------------------------------
// ラムダ式で待ち受け。ただしこれだとキャンセルができない
utcs.Task.ContinueWith(result => Debug.Log(result));
// async/awaitで待ち受け
// AttachExternalCancellationでキャンセルを外付けしておくとよい
UniTask.Void(async () =>
{
var result = await utcs.Task.AttachExternalCancellation(destroyCancellationToken);
Debug.Log(result);
});
// Observableとして扱いたいなら一度ValueTaskを介して変換する
// UniTask -> ValueTask -> R3.Observable
utcs.Task
.AsValueTask()
.ToObservable()
.Subscribe(x => Debug.Log(x))
.AddTo(this);
}
}
}
UniTask
-> R3.Observable
の変換が頻発するなら拡張メソッドを用意してもいいかも。
UniTask->R3.Observable変換の拡張メソッド例
using Cysharp.Threading.Tasks;
using R3;
namespace UniTaskR3ExtensionsSample
{
public static class UniTaskR3Extensions
{
public static Observable<T> ToR3Observable<T>(this UniTask<T> task)
{
return task.AsValueTask().ToObservable();
}
public static Observable<Unit> ToR3Observable(this UniTask task)
{
return task.AsValueTask().ToObservable();
}
public static Observable<T> ToR3Observable<T>(this UniTaskCompletionSource<T> tcs)
{
return tcs.Task.ToR3Observable();
}
public static Observable<Unit> ToR3Observable(this UniTaskCompletionSource tcs)
{
return tcs.Task.ToR3Observable();
}
public static Observable<T> ToR3Observable<T>(this AutoResetUniTaskCompletionSource<T> tcs)
{
return tcs.Task.ToR3Observable();
}
public static Observable<Unit> ToR3Observable(this AutoResetUniTaskCompletionSource tcs)
{
return tcs.Task.ToR3Observable();
}
}
}
var utcs = new UniTaskCompletionSource<int>();
utcs.ToR3Observable()
.Subscribe(x => Debug.Log(x))
.AddTo(this);
[変更] IReactiveProperty<T>
/IReadOnlyReactiveProperty<T>
インタフェースが無い
ReactiveProperty<T>
のインタフェース定義であるIReactiveProperty<T>
/IReadOnlyReactiveProperty<T>
がR3では存在しません。
もし読み取り専用でReactiveProperty<T>
を公開したいときは、IReadOnlyReactiveProperty<T>
インタフェースの代わりにReadOnlyReactiveProperty<T>
にキャストして公開しましょう。
using R3;
namespace Samples.R3Sample
{
public class ReactivePropertySample
{
// こっちが本体
private readonly ReactiveProperty<int> _rp = new();
// 公開するプロパティ
public ReadOnlyReactiveProperty<int> Value => _rp;
}
}
[変更] ReactiveCollection/ReactiveDictionaryが無い
かなり便利だったReactiveCollection<T>
およびReactiveDictionary<TKey, TValue>
はR3に未実装です(v0.1.23現在)。その代わり、他にもリアクティブなコレクション構造が追加されているObservableCollections
という別のライブラリがあるのでこちらを使いましょう。むしろObservableCollectionsの方がリアクティブなコレクション実装が増えていて便利です。
なお、R3と組み合わせるためにはObservableCollectionsとObservableCollections.R3の2つのパッケージを導入する必要があります。NugetForUnityから導入してください。
using ObservableCollections;
using UnityEngine;
using R3;
namespace Samples.R3Sample
{
public class AlternativeReactiveDictionary : MonoBehaviour
{
private void Start()
{
// ReactiveDictionaryの代わり
var dic = new ObservableDictionary<int, string>();
// 新しい要素が追加されたイベント
dic.ObserveAdd(destroyCancellationToken)
.Subscribe(collectionAddEvent =>
{
var (key, value) = collectionAddEvent.Value;
Debug.Log($"Add [{key}]={value}");
});
// 要素が上書きされたイベント
dic.ObserveReplace(destroyCancellationToken)
.Subscribe(replaceEvent =>
{
var key = replaceEvent.NewValue.Key;
var newValue = replaceEvent.NewValue.Value;
var oldValue = replaceEvent.OldValue.Value;
Debug.Log($"Replace [{key}]={oldValue} -> {newValue}");
});
dic[1] = "hoge";
dic[2] = "fuga";
dic[1] = "piyo";
}
}
}
Add [1]=hoge
Add [2]=fuga
Replace [1]=hoge -> piyo
[変更] コルーチンとの連携機能が無い
UniRxではObservable.FromCoroutine
やCoroutine.ToObservable
など、Unityコルーチンと連携した機能がありました。
ですがこれらの機能はR3ではオミットされています。
その代わりasync/await
との連携がUniRxと比べ強化されているのでコルーチンの代わりに「UniTask + async/await
」を使いましょう。
// async/awaitを用いて手続き的に値を発行するObservableを作れる
Observable.Create<int>(async (observer, ct) =>
{
observer.OnNext(1);
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
observer.OnNext(2);
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
observer.OnNext(3);
observer.OnCompleted();
})
.Subscribe(x => Debug.Log(x))
.RegisterTo(cancellationToken);
[変更] AsyncReactiveCommandが無い
UniRxにはAsyncReactiveCommand
というUIと組み合わせると便利な機能がありましたが、R3では消滅しています。
(またToReactiveCommand
も消えているが、こっちは大した実装じゃないのでUniRxを参考に自前で拡張メソッドを実装しちゃえばよさそう)
AsyncReactiveCommand
はUIの制御と組み合わせ、「非同期処理が実行中は関連するボタン制御を無効化する」といった用途に使うことができました。
これをAsyncReactiveCommand
無しで再現するならSubscribeAwait
を使うとよいかと。
AsyncReactiveCommandでのUI制御をSubscribeAwaitで代替してみる
AsyncReactiveCommandでのUI制御をSubscribeAwaitで代替してみる
SubscribeAwait
でReactiveProperty<bool>
を制御し、同時に複数のボタンのInteractable
を切り替えることでAsyncReactiveCommand
と似た挙動を再現できます。
using System;
using System.Threading;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;
using Random = UnityEngine.Random;
namespace Samples.R3Sample
{
public class AlternativeAsyncReactiveCommandSample : MonoBehaviour
{
// 各種ボタン
[SerializeField] private Button _buttonA;
[SerializeField] private Button _buttonB;
[SerializeField] private Button _buttonC;
// 状態表示用のテキスト
[SerializeField] private Text _text;
// ボタン制御用のReactiveProperty(ゲート)
private readonly ReactiveProperty<bool> _gate = new(true);
private void Start()
{
_gate.AddTo(this);
// ゲートがfalseのときはボタンを押せない状態にする
_gate.SubscribeToInteractable(_buttonA).RegisterTo(destroyCancellationToken);
_gate.SubscribeToInteractable(_buttonB).RegisterTo(destroyCancellationToken);
_gate.SubscribeToInteractable(_buttonC).RegisterTo(destroyCancellationToken);
// 状態を可視化する
_gate.Select(x => $"Gate={x}").SubscribeToText(_text);
// ボタンAが押されたときの処理
_buttonA.OnClickAsObservable()
.Where(_ => _gate.Value)
.SubscribeAwait(async (_, ct) =>
{
// 非同期処理にゲートを連動させる
await GateControlAsync(MethodAAsync(ct));
}, AwaitOperation.Drop)
.RegisterTo(destroyCancellationToken);
// ボタンBが押されたときの処理
_buttonB.OnClickAsObservable()
.Where(_ => _gate.Value)
.SubscribeAwait(async (_, ct) =>
{
// 非同期処理にゲートを連動させる
await GateControlAsync(MethodBAsync(ct));
}, AwaitOperation.Drop)
.RegisterTo(destroyCancellationToken);
// ボタンCが押されたときの処理
_buttonC.OnClickAsObservable()
.Where(_ => _gate.Value)
.SubscribeAwait(async (_, ct) =>
{
// 非同期処理にゲートを連動させる
await GateControlAsync(MethodCAsync (ct));
}, AwaitOperation.Drop)
.RegisterTo(destroyCancellationToken);
}
// 非同期処理実行中はゲートを閉める
private async UniTask GateControlAsync(UniTask task)
{
_gate.Value = false;
try
{
await task;
}
finally
{
_gate.Value = true;
}
}
// なにか各種非同期な処理があったとする
private async UniTask MethodAAsync(CancellationToken ct)
{
await UniTask.Delay(TimeSpan.FromSeconds(Random.Range(0.5f, 2f)), cancellationToken: ct);
}
private async UniTask MethodBAsync(CancellationToken ct)
{
await UniTask.Delay(TimeSpan.FromSeconds(Random.Range(0.5f, 2f)), cancellationToken: ct);
}
private async UniTask MethodCAsync(CancellationToken ct)
{
await UniTask.Delay(TimeSpan.FromSeconds(Random.Range(0.5f, 2f)), cancellationToken: ct);
}
}
}
[変更] MessageBrokerが無い
インメモリpub/subのであるMessageBroker
がUniRxには存在しましたがR3ではオミットされています。
代替としては、これの上位互換的な存在であるMessagePipe
を使うとよいでしょう。
なお、この記事の執筆時点では MessagePipe
-> R3.Observable
に直接変換するメソッドは存在していませんでした。将来的に実装される可能性があります。一応、もしR3.Observable
に変換したい場合はSystem.IObservable
への変換を一度挟むことで実現できます。
private void Sample<T>(ISubscriber<T> subscriber)
{
subscriber // MessagePipe.ISubscriber<T>
.AsObservable() // System.IObservable<T>
.ToObservable() // R3.Observable<T>
.Subscribe();
}
ただこのクッションが気持ち悪いという人は拡張メソッドを用意するとよさそう。
シンプルな拡張メソッドの例
using MessagePipe;
using R3;
namespace My.MessagePipe.R3.Ext
{
public static class MessagePipeR3Ext
{
public static Observable<TMessage> AsR3Observable<TMessage>(this ISubscriber<TMessage> subscriber,
params MessageHandlerFilter<TMessage>[] filters)
{
return subscriber.AsObservable(filters).ToObservable();
}
public static Observable<TMessage> AsR3Observable<TMessage>(this IBufferedSubscriber<TMessage> subscriber,
params MessageHandlerFilter<TMessage>[] filters)
{
return subscriber.AsObservable(filters).ToObservable();
}
public static Observable<TMessage> AsR3Observable<TKey, TMessage>(this ISubscriber<TKey, TMessage> subscriber,
TKey key,
params MessageHandlerFilter<TMessage>[] filters)
{
return subscriber.AsObservable(key, filters).ToObservable();
}
}
}
ただしこの書き方だと2回変換が挟まってちょっと無駄なので、そこを気にするならMessagePipeのObservable変換コードをR3用に書き直したものを用意するでもいいかも。
MessagePipeから直接R3に変換する拡張メソッドの例
// MessagePipe
// MIT License
// Copyright (c) 2021 Cysharp, Inc.
// https://github.com/Cysharp/MessagePipe/blob/master/LICENSE
using System;
using MessagePipe;
using R3;
namespace My.MessagePipe.R3.Ext
{
public static class MessagePipeR3Ext
{
public static Observable<TMessage> AsR3Observable<TMessage>(this ISubscriber<TMessage> subscriber,
params MessageHandlerFilter<TMessage>[] filters)
{
return new ObservableSubscriber<TMessage>(subscriber, filters);
}
public static Observable<TMessage> AsR3Observable<TMessage>(this IBufferedSubscriber<TMessage> subscriber,
params MessageHandlerFilter<TMessage>[] filters)
{
return new ObservableBufferedSubscriber<TMessage>(subscriber, filters);
}
public static Observable<TMessage> AsR3Observable<TKey, TMessage>(this ISubscriber<TKey, TMessage> subscriber,
TKey key,
params MessageHandlerFilter<TMessage>[] filters)
{
return new ObservableSubscriber<TKey, TMessage>(key, subscriber, filters);
}
}
internal sealed class ObservableSubscriber<TKey, TMessage> : Observable<TMessage>
{
readonly TKey key;
readonly ISubscriber<TKey, TMessage> subscriber;
readonly MessageHandlerFilter<TMessage>[] filters;
public ObservableSubscriber(TKey key,
ISubscriber<TKey, TMessage> subscriber,
MessageHandlerFilter<TMessage>[] filters)
{
this.key = key;
this.subscriber = subscriber;
this.filters = filters;
}
protected override IDisposable SubscribeCore(Observer<TMessage> observer)
{
return subscriber.Subscribe(key, new ObserverMessageHandler<TMessage>(observer), filters);
}
}
internal sealed class ObservableSubscriber<TMessage> : Observable<TMessage>
{
readonly ISubscriber<TMessage> subscriber;
readonly MessageHandlerFilter<TMessage>[] filters;
public ObservableSubscriber(ISubscriber<TMessage> subscriber, MessageHandlerFilter<TMessage>[] filters)
{
this.subscriber = subscriber;
this.filters = filters;
}
protected override IDisposable SubscribeCore(Observer<TMessage> observer)
{
return subscriber.Subscribe(new ObserverMessageHandler<TMessage>(observer), filters);
}
}
internal sealed class ObservableBufferedSubscriber<TMessage> : Observable<TMessage>
{
readonly IBufferedSubscriber<TMessage> subscriber;
readonly MessageHandlerFilter<TMessage>[] filters;
public ObservableBufferedSubscriber(IBufferedSubscriber<TMessage> subscriber,
MessageHandlerFilter<TMessage>[] filters)
{
this.subscriber = subscriber;
this.filters = filters;
}
protected override IDisposable SubscribeCore(Observer<TMessage> observer)
{
return subscriber.Subscribe(new ObserverMessageHandler<TMessage>(observer), filters);
}
}
internal sealed class ObserverMessageHandler<TMessage> : IMessageHandler<TMessage>
{
readonly Observer<TMessage> observer;
public ObserverMessageHandler(Observer<TMessage> observer)
{
this.observer = observer;
}
public void Handle(TMessage message)
{
observer.OnNext(message);
}
}
}
[変更] UniRx.ToolKit.ObjectPoolが無い
UniRxにはオブジェクトプールの機構がありましたが、R3にはありません。
代替としては、「Unity公式のObjectPool」や「uPools」などが挙げられます。
[変更] Debugオペレータ
UniRxにはDebug
というオペレータがあり、Observable
のメッセージを表示してくれる便利機能がありました。ただR3には存在していないので、必要ならば拡張メソッドとして自分で用意しておくと便利です。
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public static class ObservableDebugExt
{
public static Observable<T> Debug<T>(this Observable<T> source, string label = null)
{
#if DEBUG
var l = (label == null) ? "" : $"[{label}] ";
return source.Materialize()
.Do(
onNext: x => UnityEngine.Debug.Log(l + x),
onDispose: () => UnityEngine.Debug.Log($"{l}OnDispose"),
onSubscribe: () => UnityEngine.Debug.Log($"{l}OnSubscribe")
)
.Dematerialize();
#else
return source;
#endif
}
public static Observable<T> Debug<T>(this Observable<T> source, ILogger logger)
{
#if DEBUG
return source.Materialize()
.Do(
onNext: x => logger.Log(x.ToString()),
onDispose: () => logger.Log("OnDispose"),
onSubscribe: () => logger.Log("OnSubscribe")
)
.Dematerialize();
#else
return source;
#endif
}
}
}
Observable.Range(0, 3)
.Debug("Observable.Range")
.Subscribe();
[Observable.Range] OnSubscribe
[Observable.Range] 0
[Observable.Range] 1
[Observable.Range] 2
[Observable.Range] Success
[Observable.Range] OnDispose
その他の細かい変更点
Select/Where/Subscribe/Doで外部変数を渡せるようになった
Select
/Where
/Subscribe
で外部変数をObservable
構築時に渡せることができるようになりました。これによってラムダ式から外部変数のキャプチャによるGCアロケートを避けることができます。
using R3;
using UnityEngine;
namespace Samples
{
public class StateSample : MonoBehaviour
{
[SerializeField] private float _fallThreshold = -10;
private void Start()
{
Observable.EveryValueChanged(transform, t => t.position, destroyCancellationToken)
// ラムダ式から外部変数をキャプチャさせない
.Where(_fallThreshold, (position, threshold) => position.y < threshold)
.Subscribe(_ => Destroy(gameObject));
}
}
}
Observable.EveryUpdateがキャンセル可能に
UniRxにもあったObservable.EveryUpdate
ですが、CancellationToken
の指定が可能となりました。(あとUnityFrameProvider
でタイミングの指定もできます)
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class EveryUpdateSample : MonoBehaviour
{
private void Start()
{
// このGameObjectに寿命が連動したObservableになる
// 実質 this.UpdateAsObservable()
Observable
.EveryUpdate(destroyCancellationToken)
.Subscribe(_ => OnUpdate());
}
private void OnUpdate()
{
}
}
}
EveryValueChangedの記法変更
指定したオブジェクトを毎フレーム差分チェックし、変化があったら通知するという機能であったEveryValueChanged
ですが、記法が変わりました。
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class EveryValueChangedSample : MonoBehaviour
{
private void Start()
{
// UniRxではこうだったが
// transform.ObserveEveryValueChanged(t => t.position).Subscribe(p => Debug.Log(p));
// R3での記法はこうなった
Observable
.EveryValueChanged(transform, t => t.position)
.Subscribe(p => Debug.Log(p));
// CancellationTokenを渡すこともできる
Observable
.EveryValueChanged(transform, t => t.rotation, destroyCancellationToken)
.Subscribe(r => Debug.Log(r));
}
}
}
ReactivePropertyにIEqualityComparerが指定可能に
地味な変更点ですが、ReactiveProperty<T>
のコンストラクタでIEqualityComparer<T>
が指定できるようになりました。
UniRxでは同値判定のカスタマイズができなかったのでこの変更は個人的には嬉しいです。
ReactivePropertyのSkipLatestValueOnSubscribe がなくなった
ReactiveProperty<T>
を最初にSubscribe
したときの値の発行をスキップするSkipLatestValueOnSubscribe()
がなくなりました。
といっても処理としてはSkip(1)
を挟むだけなので、必要ならSkip(1)
を使いましょう。
またSetValueAndForceNotify()
もなくなっていますが、こちらはOnNext()
で代替できます。
まとめ
R3とUniRxは似ているものの根本の思想が異なるライブラリであるため、かなり多くの差分が存在します。
そのため、すべてをまとめきれてはいないとは思いますがご容赦ください。
以下ポエム
個人的にR3のどこが気に入ったか
SubscribeAwait
-
FakeTimeProvider
/FakeFrameProvider
によるユニットテスト Observable Tracker
UniRxでできなかったこと/問題になっていたことを解消しているので、このあたりはだいぶ気に入りました。
UniRxからR3に乗り換えるべきかどうか
個人的な意見としては、「可能であるならUniRxからR3に乗り換えるべき」だと考えます。
というのもR3の方が最新C#の機能を取り込み、機能としてもパフォーマンスとしてもかなり洗練されたライブラリに仕上がっています。
UniRxも決して悪いライブラリではないのですが、「R3かUniRxかどっちを取るか」と言われたら迷うこと無く「R3」です。
ただしR3はフル機能を活かすならUnity 2022.2以降である必要があるため、プロジェクトによってはそこが障害となる可能性があります(その場合はUniRxを継続して利用し続けるしかないです)
Unityバージョンの問題をクリアしたとしても、UniRxからR3は挙動が大きく異なっている部分(とくにエラーハンドリング周り)があるため置き換えるのは大変かもしれません。そのため「現状UniRxで特に困ってない」というのであれば無理に載せ替えないのも選択肢としてはありだと思います。
UniTaskとasync/awaitがすでにあって便利なのに、R3を導入するメリットはあるのか
メリットはあります。
そもそも「UniTask + async/await
」と「R3」は扱う領域が異なります。
Rxは「PUSH型のインメモリなメッセージング機構」「LINQ to Events
」としての機能が非常に強力であり、async/await
では代替不可能です。
PUSH型のイベントメッセージングが必要な場面では「R3」を使い、非同期処理やPULL型の制御を行う場面では「UniTask, async/await
」を使う、といった様にこの2つは併用することが推奨されると考えます。むしろR3はasync/await
との併用を前提に作られているので、組み合わせて使いましょう!