前提
「R3」とは、ReactiveExtensionsの最新の環境のC#に合わせて再構築したライブラリです。Unityでは「UniRx」というライブラリがありましたが、大雑把にいえば「UniRxを最新の環境にあわせてリメイク」したものという認識でよいでしょう。
詳しくは別記事でまとめてあります。
またこの記事執筆時点での環境は次のとおりです。
- Unity - 2023.1.14f1
- R3 - 1.0.0
- ObservableCollections - 2.0.1
- NuGetForUnity - 4.0.2
今回の概要
「UniRx」と「R3」の機能の比較、R3での新機能や廃止された機能、UniRxからR3に置き換えるときの代替などについて紹介します。(細かい部分まですべては拾いきれないので、紹介漏れはご容赦ください。またUnity向けでない機能などは省略しています)
この記事中に登場するサンプルコードは別途記載がない限りはCC0です。自由にコピペして使ってください。
(ただし発生したトラブルや問題については責任を負いません)
またサンプルプロジェクトはGitHubにて公開しています。
動作環境などの違い
最低Unityバージョン
UniRx
UniRxでは最低バージョンは特に存在せず、Unity 2017頃のかなり古いUnityバージョンでも動作します。
R3
R3では最低でもUnity 2021.3以上である必要があります。
補足: destroyCancellationToken
R3ではキャンセルをCancellationTokenによって管理する仕組みになっています。
Unity 2022.2以降であればMonoBehaviour上でdestroyCancellationTokenが利用できるため、こちらを使うのが便利です。
Unity 2022.2未満の場合は代わりにR3が提供するObservableDestroyTriggerを使うことで同等の機能が得られます。
(Unity2022.2以降でObservableDestroyTriggerを使っても問題はありません)
using System;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
public class DestroySample : MonoBehaviour
{
private void Start()
{
// Unity 2022.2以降ならdestroyCancellationTokenが使える
Observable
.Timer(TimeSpan.FromSeconds(3), destroyCancellationToken)
.Subscribe();
// destroyCancellationTokenが使えない場合はGetCancellationTokenOnDestroyを代わりに使えばOK
// (Unity2022.2以降なら内部的にdestroyCancellationTokenを返すだけなのでノーコスト)
Observable
.Timer(TimeSpan.FromSeconds(3), this.GetCancellationTokenOnDestroy())
.Subscribe();
}
}
導入方法
UniRx
UniRx次のいずれかの方法で導入する必要があります。
- unitypackage
- UPM(Git)
- UPM(OpenUPM)
R3
R3では「コアモジュール」と「Unityプラグイン」の2つに別れており、Unityでフル機能を動作させる場合はこの両者をインストールする必要があります。R3の公式ドキュメントに両者のインストール方法が記載されています。
コアモジュールはNuget経由でのインストールが必要なため、NugetForUnityを使う方法が推奨されます。Unity用プラグイン(R3.Unity)はGit経由でUnityPackageManagerからインストールしてください。
詳しくは公式ドキュメントを参照してください。
なおasmdefでモジュール管理をしている場合は、R3.Unityを参照に追加してください。
根本的な挙動の違い
R3ではObservableの概念が根本から見直されているため、UniRxと挙動が大きく異なります。
-
OnErrorメッセージがOnErrorResumeメッセージに変更された -
OnCompletedメッセージ発行時に「正常終了」か「異常終了(例外込み)」かを選べるようになった - すべての
ObservableはOnCompletedメッセージを最後に発行できるようになった -
Schedulerが廃止された -
async/awaitとの連携がやりやすくなった -
CancellationTokenで制御しやすくなった
こちらについては別の記事ですでに解説済みですので次の記事を参照してください。
大きな変更点はREADMEに書いてある
R3のREADMEに差分が書いてあるので、まずはそちらを読みましょう。
UniRxからR3に移行しても(ほぼ)そのまま使える機能
UniRxでよく使われていた機能はR3にも存在します。そのため次の機能についてはR3に以降してもほぼ同じように使えます。
Trigger(MonoBehaviourのイベント変換)
UniRxに存在したMonoBehaviourのイベントをObservableに変換する機能(Trigger)ですが、R3でも利用可能です。
using R3;
using R3.Triggers;
using UnityEngine;
namespace Samples.R3Sample
{
public class TriggerSample : MonoBehaviour
{
private void Start()
{
// このGameObjectに紐づいたOnCollisionEnterをObservableとして取得できる
this.OnCollisionEnterAsObservable()
.Subscribe(collision =>
{
Debug.Log("OnCollisionEnter: " + collision.gameObject.name);
});
// Update()をObservableとして取得できる
this.UpdateAsObservable()
.Subscribe(_ =>
{
Debug.Log("Update!");
});
// 他にもいろいろある
}
}
}
AddTo(MonoBehaviour)
MonoBehaviourの寿命にIDisposableを連動させるAddTo(this)ですが、R3でも使えます。
using R3;
using R3.Triggers;
using UnityEngine;
namespace Samples.R3Sample
{
public class AddToSample : MonoBehaviour
{
[SerializeField] private GameObject _childObject;
private void Start()
{
// childObjectに紐づいたOnCollisionEnterをObservableとして取得
_childObject
.OnCollisionEnterAsObservable()
.Subscribe(collision =>
{
Debug.Log("OnCollisionEnter: " + collision.gameObject.name);
})
// Observableの寿命をこのMonoBehaviourに紐付ける
.AddTo(this);
}
}
}
uGUIコンポーネントのイベント変換
UnityEngine.UI.Buttonなどのいわゆる「uGUI」のイベントをObservableに変換する機能はUniRxから引き続き利用可能です。
どのようなイベントが利用可能かはこちらを参照してください。
using R3;
using UnityEngine;
using UnityEngine.UI;
namespace Samples.R3Sample
{
public class GuiEventSample : MonoBehaviour
{
[SerializeField] private Button _button;
[SerializeField] private InputField _inputField;
[SerializeField] private Slider _slider;
[SerializeField] private Text _text;
private void Start()
{
// ボタンのクリック
_button
.OnClickAsObservable()
.Subscribe(_ => Debug.Log("Button Clicked!"))
.AddTo(this);
// InputFieldのテキスト変更
_inputField.OnValueChangedAsObservable()
.Subscribe(txt => Debug.Log("InputField Text: " + txt))
.AddTo(this);
// Sliderの値変更
_slider.OnValueChangedAsObservable()
.Subscribe(v => Debug.Log("Slider Value: " + v))
.AddTo(this);
// InputFieldのテキストをTextに反映
_inputField.OnValueChangedAsObservable()
.SubscribeToText(_text)
.AddTo(this);
}
}
}
R3での新機能(UniRxには無かった機能)
[新機能] SubscribeAwait/SelectAwait/WhereAwait
次世代Rx「R3」解説でも解説しましたが、SubscribeやSelect/Whereでasync/awaitを併用できる版が追加されました。R3の場合はasync/awaitの完了とメッセージ処理をいい感じに制御してくれます。(UniTaskと組み合わせるとさらに便利!)
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;
namespace Samples.R3Sample
{
public class SubscribeAwaitSample1 : MonoBehaviour
{
[SerializeField] private Button _goButton;
private void Start()
{
// ボタンが押されたら1秒間前進する
// 連打された場合はその数だけ進む
_goButton.OnClickAsObservable()
.SubscribeAwait(async (_, ct) =>
{
var time = Time.time;
while (Time.time - time < 1f)
{
transform.position += Vector3.forward * Time.deltaTime;
await UniTask.Yield(ct);
}
},
AwaitOperation.Sequential,
// configureAwaitはtrueから変更しないことを推奨
configureAwait: true)
.AddTo(this);
}
}
}
なお、configureAwaitはTrueを指定しましょう。(デフォルトTrueです)
Falseを指定した場合、実行コンテキストが意図せずにスレッドプールへ切り替わってしまう場合があります。
また、AwaitOperationというパラメータを指定することで非同期処理の実行中(awaitの処理が終わる前)に次のメッセージが到達してしまったときの挙動を調整することができます。
| AwaitOperation |
await中に次のイベントが来たときの挙動 |
備考 |
|---|---|---|
| Sequential | 今実行中の処理を優先。余剰なイベントはキューに積む。非同期処理が終わり次第、次の1つを取り出して順番に非同期実行する。 | |
| Drop | 今実行中の処理を優先。余剰なイベントは無視してなかったことにする。 | |
| Switch | 今実行中の非同期処理をキャンセル。 新しく到達したイベントの処理を優先して開始する。 | キャンセル処理はCancellationTokenを使って自分で実装する必要がある。 |
| Parallel | 新しく来たイベントを即座に処理する。処理が終わったものから早いもの勝ちで出力される。 |
maxConcurrentで同時実行数を制限できる。maxConcurrentを超える数のメッセージはキューに積まれる。 |
| SequentialParallel※ | 新しく来たイベントを即座に処理する。処理の終了順によらず、出力順が入力順と同じになるように順序調整される。 |
maxConcurrentで同時実行数を制限できる。maxConcurrentを超える数のメッセージはキューに積まれる。 |
| ThrottleFirstLast | 非同期処理が実行されていないとき、新しく届いた値を処理する。非同期処理の実行中は最新の値を1つだけ保持し、非同期処理の終了時にそれを取り出して処理を行う。 |
ThrottleFirstとThrottleLastが合体した挙動 |
※ SequentialParallelはWhereAwait/SelectAwaitでのみ利用可
(補足)UniRxだとどういう挙動をしていたか
UniRxだとどういう挙動をしていたか
[新機能] Debounce/ThrottleFirst/ThrottleLastの非同期対応
Debounce(旧名Throttle)/ThrottleFirst/ThrottleLast(旧名Sample)はUniRxにもあったオペレータですが、R3では非同期処理に対応しました。つまりasync/awaitと併用ができます。
それぞれの非同期版の挙動は次のとおりです。
-
Debounce:メッセージが到達したら非同期処理を実行。非同期処理が完遂したらそのメッセージを発行する。非同期処理中に次のメッセージが来た場合は実行中に非同期処理をキャンセルして再び非同期処理を実行しなおす。 -
ThrottleFirst:メッセージが到達したらそれを通過させた後に非同期処理を実行、その処理が終わるまでメッセージを遮断する -
ThrottleLast:メッセージが到達したら非同期処理を実行してメッセージを遮断、その処理が終わった時最後に届いていたメッセージを1つだけ発行する」
DebounceとThrottleLastの違いは非同期処理をやり直すか完遂するかの違いです。Debounceはメッセージが来るたびに非同期処理をやり直し。ThrottleLastは一度走り始めたらそれが完遂するまでやりきります。
Debounceの例
Debounceを使ったサンプル
using System.Threading;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;
namespace Samples.R3Sample
{
public class DebounceSample : MonoBehaviour
{
[SerializeField] private Button _button;
[SerializeField] private Text _buttonText;
[SerializeField] private Text _outputText;
[SerializeField] private Slider _processSlider;
private readonly ReactiveProperty<int> _currentValue = new();
private void Start()
{
_currentValue.AddTo(this);
// ボタンが押されたらカウンタを更新
_button.OnClickAsObservable()
.Subscribe(_ => _currentValue.Value++)
.AddTo(this);
// カウンタの数値をボタンに反映
_currentValue.SubscribeToText(_buttonText).AddTo(this);
// カウンタが増加したらオペレータを通してOutputのテキストに出力
_currentValue
.Skip(1)
// Debounceで遮断
.Debounce((_, ct) => UpdateSliderAsync(1f, ct))
.SubscribeToText(_outputText)
.AddTo(this);
}
// 一定時間待機する(その状況をスライダーに反映)
private async UniTask UpdateSliderAsync(float waitSeconds, CancellationToken ct)
{
_processSlider.value = 0;
// 合計で1秒待機する
var currentTime = 0f;
while (!ct.IsCancellationRequested && currentTime < waitSeconds)
{
await UniTask.Yield();
currentTime += Time.deltaTime;
// 経過状況をスライダーに反映
_processSlider.value = Mathf.Clamp01(currentTime / waitSeconds);
}
}
}
}
ThrottleFirstの例
ThrottleFirstを使ったサンプル
using System.Threading;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;
namespace Samples.R3Sample
{
public class ThrottleFirstSample : MonoBehaviour
{
[SerializeField] private Button _button;
[SerializeField] private Text _buttonText;
[SerializeField] private Text _outputText;
[SerializeField] private Slider _processSlider;
private readonly ReactiveProperty<int> _currentValue = new();
private void Start()
{
_currentValue.AddTo(this);
// ボタンが押されたらカウンタを更新
_button.OnClickAsObservable()
.Subscribe(_ => _currentValue.Value++)
.AddTo(this);
// カウンタの数値をボタンに反映
_currentValue.SubscribeToText(_buttonText).AddTo(this);
// カウンタが増加したらオペレータを通してOutputのテキストに出力
_currentValue
.Skip(1)
// ThrottleFirstで遮断
.ThrottleFirst((_, ct) => UpdateSliderAsync(1f, ct))
.SubscribeToText(_outputText)
.AddTo(this);
}
// 一定時間待機する(その状況をスライダーに反映)
private async UniTask UpdateSliderAsync(float waitSeconds, CancellationToken ct)
{
_processSlider.value = 0;
// 合計で1秒待機する
var currentTime = 0f;
while (!ct.IsCancellationRequested && currentTime < waitSeconds)
{
await UniTask.Yield();
currentTime += Time.deltaTime;
// 経過状況をスライダーに反映
_processSlider.value = Mathf.Clamp01(currentTime / waitSeconds);
}
}
}
}
ThrottleLastの例
ThrottleLastを使ったサンプル
using System.Threading;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;
namespace Samples.R3Sample
{
public class ThrottleLastSample : MonoBehaviour
{
[SerializeField] private Button _button;
[SerializeField] private Text _buttonText;
[SerializeField] private Text _outputText;
[SerializeField] private Slider _processSlider;
private readonly ReactiveProperty<int> _currentValue = new();
private void Start()
{
_currentValue.AddTo(this);
// ボタンが押されたらカウンタを更新
_button.OnClickAsObservable()
.Subscribe(_ => _currentValue.Value++)
.AddTo(this);
// カウンタの数値をボタンに反映
_currentValue.SubscribeToText(_buttonText).AddTo(this);
// カウンタが増加したらオペレータを通してOutputのテキストに出力
_currentValue
.Skip(1)
// ThrottleLastで遮断
.ThrottleLast((_, ct) => UpdateSliderAsync(1f, ct))
.SubscribeToText(_outputText)
.AddTo(this);
}
// 一定時間待機する(その状況をスライダーに反映)
private async UniTask UpdateSliderAsync(float waitSeconds, CancellationToken ct)
{
_processSlider.value = 0;
// 合計で1秒待機する
var currentTime = 0f;
while (!ct.IsCancellationRequested && currentTime < waitSeconds)
{
await UniTask.Yield();
currentTime += Time.deltaTime;
// 経過状況をスライダーに反映
_processSlider.value = Mathf.Clamp01(currentTime / waitSeconds);
}
}
}
}
[新機能] IDisposable.RegisterTo(CancellationToken)
IDisposableに対する拡張メソッドとしてRegisterToが定義されています。これにより指定のCancellationTokenにIDisposable.Dispose()を連動させることができます。
機能自体はUniTaskにも存在したIDisposable.AddTo(CancellationToken)と同じです。(名前がUniTaskとR3で衝突したのでRegisterToに変えた、とのこと)
using R3;
using R3.Triggers;
using UnityEngine;
namespace Samples.R3Sample
{
public class RegisterToSample : MonoBehaviour
{
private void Start()
{
// このObservableの寿命をCancellationTokenに連動させる
this.UpdateAsObservable()
.Subscribe(_ => Debug.Log("Update!"))
.RegisterTo(destroyCancellationToken);
}
}
}
[新機能] ReplayFrameSubject
指定した期間分だけ発行されたメッセージをキャッシュしてくれるReplaySubjectのフレーム指定版が登場しました。後述するFrameProviderと組み合わせることで「過去一定フレーム以内に発行されたメッセージをすべてキャッシュする」といった使い方ができます。メッセージを発行するタイミングとSubscribe()をするタイミングがズレている場合などに使うと便利です。
// 例:過去FixedUpdate()10フレーム分の期間に発行されたメッセージをすべてキャッシュするSubject
var replayFrameSubject = new ReplayFrameSubject<Unit>(window: 10, UnityFrameProvider.FixedUpdate);
[新機能] Observable Tracker
Observable Trackerは購読中のObservableをUnityEditor上で可視化できるツールです。控えめに言ってもすごくよいです。Observableは購読の解除漏れがあったときそれに気づきにくいのが問題でしたが、Observable Trackerを使うことで解決します。
どのObservableがどのスタックで動いており、いつから稼働しているかなどを一覧で可視化することができます。
[新機能] SerializableReactiveProperty<T>
SerializableReactiveProperty<T>を使うことで、ReactivePropertyをそのままUnityのインスペクターウィンドウに表示できるようになりました。UniRxでは任意の型を表示したいときにEditor拡張を用意する必要がありましたが、R3ではジェネリックをそのまま使うことができます。
public class NewBehaviourScript : MonoBehaviour
{
public SerializableReactiveProperty<int> rpInt;
public SerializableReactiveProperty<long> rpLong;
public SerializableReactiveProperty<byte> rpByte;
public SerializableReactiveProperty<float> rpFloat;
public SerializableReactiveProperty<double> rpDouble;
public SerializableReactiveProperty<string> rpString;
public SerializableReactiveProperty<bool> rpBool;
public SerializableReactiveProperty<Vector2> rpVector2;
public SerializableReactiveProperty<Vector2Int> rpVector2Int;
public SerializableReactiveProperty<Vector3> rpVector3;
public SerializableReactiveProperty<Vector3Int> rpVector3Int;
public SerializableReactiveProperty<Vector4> rpVector4;
public SerializableReactiveProperty<Color> rpColor;
public SerializableReactiveProperty<Rect> rpRect;
public SerializableReactiveProperty<Bounds> rpBounds;
public SerializableReactiveProperty<BoundsInt> rpBoundsInt;
public SerializableReactiveProperty<Quaternion> rpQuaternion;
public SerializableReactiveProperty<Matrix4x4> rpMatrix4x4;
public SerializableReactiveProperty<FruitEnum> rpEnum;
public SerializableReactiveProperty<FruitFlagsEnum> rpFlagsEnum;
}
[新機能] LiveList
R3にはLiveList<T>というObservable<T>から変換可能なコレクションが用意されています。LiveList<T>は「Observable<T>を購読し、発行されたメッセージを自動的にリストに追加する」という挙動をします。
テスト時などに活用すると便利です。
using System;
using NUnit.Framework;
using R3;
namespace Samples.R3Tests
{
public class LiveListSample
{
[Test]
public void LiveListが便利()
{
using var subject = new Subject<int>();
// Observable -> LiveList
using var liveList = subject.ToLiveList();
// 現時点で出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
// 「1」を発行
subject.OnNext(1);
// 発行された「1」が反映されている
CollectionAssert.AreEqual(new[] { 1 }, liveList);
subject.OnNext(2);
subject.OnNext(3);
// 3つの値が反映されている
CollectionAssert.AreEqual(new[] { 1, 2, 3 }, liveList);
}
}
}
なおLiveListをDispose()するとObservableの購読も同時に終了します。
[新機能] Observableのユニットテストサポート(FakeFrameProvider)
話が前後してしまいますが、R3ではSchedulerの概念が廃止され代わりにTimeProvider/FrameProviderにより時間の制御が行われています。そのうちFrameProviderのデバッグ用実装がFakeFrameProviderです。これを用いることでObservableのテストが書きやすくなります。
なお、TimeProviderのデバッグ用実装であるFakeTimeProviderはMicrosoft.Extensions.Time.Testingパッケージとして公開されており、こちらは別途Nuget経由で導入する必要があります。
メモ:FakeTimeProviderの導入方法
FakeTimeProviderの導入方法
FakeTimeProviderを使いたい場合はMicrosoft.Extensions.TimeProvider.Testingパッケージを導入する必要があります。
ただこちらのパッケージですが、NugetForUnityから導入するとMicrosoft.Bcl.AsyncInterfacesとUnityが干渉してうまく導入できないことがあります。
そのため現時点ではNuGetのWebページから.NET Framework 4.6.2向けのdllを手動ででDLして直接プロジェクトに入れてしまう方法が推奨です。
テスト例
下準備としてテストアセンブリに
- R3.Unity
- R3.dll
- Microsoft.Bcl.TimeProvider.dll
- Microsoft.Extensions.TimeProvider.Testing.dll
を登録してください。
その上で、次のようなテストを書くことでTimeProvider/FrameProviderを差し替えたテストができます。
using System;
using R3;
namespace Samples.R3Sample
{
// テスト対象
public class TestTargetObject : IDisposable
{
private readonly Subject<int> _subject = new();
// Publish()した値を一定時間後に出力するだけのObservable
public Observable<int> OutputDelayFrame => _subject.DelayFrame(30);
public Observable<int> OutputDelay => _subject.Delay(TimeSpan.FromSeconds(3));
public void Publish(int value)
{
_subject.OnNext(value);
}
public void Dispose()
{
_subject.Dispose();
}
}
}
using System;
using Microsoft.Extensions.Time.Testing;
using NUnit.Framework;
using Samples.R3Sample;
using R3;
namespace Samples.R3Tests
{
public class TestR3Observable
{
private FakeFrameProvider _fakeFrameProvider;
private FakeTimeProvider _fakeTimeProvider;
[SetUp]
public void SetUp()
{
// SetUpでデフォルトのFrameProviderを差し替える
_fakeFrameProvider = new FakeFrameProvider();
_fakeTimeProvider = new FakeTimeProvider();
ObservableSystem.DefaultFrameProvider = _fakeFrameProvider;
ObservableSystem.DefaultTimeProvider = _fakeTimeProvider;
}
[Test]
public void OutputDelayFrameのテスト()
{
using var target = new TestTargetObject();
// LiveListに変換
using var liveList = target.OutputDelayFrame.ToLiveList();
// 現時点で出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
target.Publish(1);
target.Publish(2);
target.Publish(3);
// 30フレーム経過するまで出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
// 29フレーム経過させる
_fakeFrameProvider.Advance(29);
// まだ出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
// さらに1フレーム経過させる
_fakeFrameProvider.Advance(1);
// 30フレーム経過したので出力されているはず
CollectionAssert.AreEqual(new[] { 1, 2, 3 }, liveList);
}
[Test]
public void OutputDelayのテスト()
{
using var target = new TestTargetObject();
// LiveListに変換
using var liveList = target.OutputDelay.ToLiveList();
// 現時点で出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
target.Publish(1);
target.Publish(2);
target.Publish(3);
// 3秒経過するまで出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
// 2秒進める経過させる
_fakeTimeProvider.Advance(TimeSpan.FromSeconds(2));
// まだ出力はゼロ
CollectionAssert.AreEqual(Array.Empty<int>(), liveList);
// さらに1秒進める
_fakeTimeProvider.Advance(TimeSpan.FromSeconds(1));
// 計3秒経ったので値が発行された
CollectionAssert.AreEqual(new[] { 1, 2, 3 }, liveList);
}
}
}
[新機能] ObservableSystem
R3では全体のObservableの挙動を設定するObservableSystemという機能が追加されています。
// Observable内で発行された例外が処理されなかったときに最終的に到達するハンドラを登録できる
ObservableSystem.RegisterUnhandledExceptionHandler(ex => Debug.LogException(ex));
// デフォルトのTimeProvider/FrameProviderを指定できる(後述)
ObservableSystem.DefaultTimeProvider = UnityTimeProvider.Update;
ObservableSystem.DefaultFrameProvider = UnityFrameProvider.Update;
R3で廃止/変更された機能(UniRxからの移行時に代替が必要なもの)
[変更] Schedulerが廃止
SchedulerとはObservableにおける「時間」「タイミング」「実行コンテキスト」を制御するための機構です。R3ではこのSchedulerは廃止されており、代わりにTimeProviderおよびFrameProviderで制御される仕組みとなっています。
(TimeProviderは.NET 8で追加された「時間」を抽象化するための機構です。.NET 8といいつつも単品パッケージとして公開されており、Nugetから導入することで2024年現在のUnityでも利用することができます)
「R3.Unity」を導入している場合はUnityTimeProvider/UnityFrameProviderという実装が追加されます。これを指定することでUnityの挙動に合わせた時間管理をR3で行うことができます。
TimeProvider/FrameProviderの指定
Observable定義時にTimeProvider/FrameProviderを引数で指定できるものがあります。
Observableごとに挙動を変更したい場合は指定してください。
using System;
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class TimeProviderSample : MonoBehaviour
{
private void Start()
{
// Observable.EveryUpdateは一定のフレーム間隔でメッセージを発行する
// どのフレームタイミングでメッセージ発行するかを指定できる
Observable
.EveryUpdate(UnityFrameProvider.FixedUpdate, destroyCancellationToken)
.Subscribe(_ =>
{
// FixedUpdateと同じタイミングで実行される
});
// Observable.Timerは指定した時間が経過したらメッセージを発行する
// 時間の計測をUpdate()のタイミングで実施する
Observable
.Timer(TimeSpan.FromSeconds(1),
UnityTimeProvider.Update,
destroyCancellationToken)
.Subscribe(_ =>
{
// 1秒後に実行される
});
}
}
}
とくにUnityTimeProviderですが、「Time.scaleの影響を受けるもの」「Time.scaleの影響を受けないもの」「Unityの時間とは独立して時間計測するもの」の3パターンの指定ができます。
用途によって使い分けましょう。
Observable
.Timer(TimeSpan.FromSeconds(1),
// Time.scaleの影響を受ける
// Unityが動作を停止していた場合は時間が進まない
UnityTimeProvider.Update,
destroyCancellationToken)
.Subscribe(_ =>
{
// 1秒後に実行される
});
Observable
.Timer(TimeSpan.FromSeconds(1),
// Time.scaleの影響を受けないが、
// Unityが動作を停止していた場合は時間が進まない
UnityTimeProvider.UpdateIgnoreTimeScale,
destroyCancellationToken)
.Subscribe(_ =>
{
// 1秒後に実行される
});
Observable
.Timer(TimeSpan.FromSeconds(1),
// Unityの挙動とは独立した時間計測
// Unityが動作を停止していた場合でも時間が進む
UnityTimeProvider.UpdateRealtime,
destroyCancellationToken)
.Subscribe(_ =>
{
// 1秒後に実行される
});
デフォルトの指定
ObservableSystem.DefaultTimeProviderおよびObservableSystem.DefaultFrameProviderを設定することで、グローバルで用いるProviderのデフォルト値を変更することができます。
// デフォルトは両者ともに「Update」
ObservableSystem.DefaultTimeProvider = UnityTimeProvider.Update;
ObservableSystem.DefaultFrameProvider = UnityFrameProvider.Update;
R3.Unityでは起動時に自動的にUpdateが指定されるようになっています。変更したい場合は手動で上書きしてください。
[変更] CurrentThreadSchedulerが使えない
前述の廃止されたSchedulerのひとつにCurrentThreadSchedulerというものがありました。
このCurrentThreadSchedulerは「Observable中でメッセージ発行が再帰したときに、メッセージ順序を末尾再帰に変換してくれる」という性質がありました。
この「再帰したときに末尾再帰へ変換する」という挙動をR3で再現したい場合はTrampoline()というオペレータを使ってください。
再帰したときにメッセージ順序が変になる例
再帰したときにメッセージ順序が変になる例
Observableのメッセージが再帰する例として「オンラインゲームでのプレイヤの参加通知」というものを挙げます。
- プレイヤーが新たに接続してきたときに
PlayerJoined通知を発行する -
PlayerJoined通知を受けて、ゲームのマネージャがそのプレイヤーをゲームに参加させる - プレイヤーがゲームに正常に参加したときに、
PlayerAddedToTeam通知を発行する
という挙動を考えてみます。このときに発行されるイベントは「PlayerJoined → PlayerAddedToTeam」という順序であってほしいです。(PlayerAddedToTeamイベントが先に発行されてしまうと処理が壊れる可能性がある)
ですが、これをそのまま実装してしまうとメッセージ順序の入れ替わりが発生してしまいます。
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class PlayerEventSample : MonoBehaviour
{
// ゲーム中で扱うイベント
private enum GameEvent
{
PlayerJoined,
PlayerAddedToTeam
}
// ゲームイベントを通知するためのSubject
private readonly Subject<GameEvent> _subject = new();
private void Start()
{
_subject.AddTo(this);
// イベント通知をいろんな場所で購読して処理している(というイメージ)
SubscriberA(_subject);
SubscriberB(_subject);
SubscriberC(_subject);
// 新しくプレイヤーが接続してきたことを通知する
_subject.OnNext(GameEvent.PlayerJoined);
}
// 購読者A
// PlayerJoinedイベントを受け取ったら、ゲームに参加させる処理を行う
private void SubscriberA(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"A: {e}");
// PlayerJoinedイベントを受け取ったら、ゲームに参加させる処理を行う
// その後、PlayerAddedToTeamイベントを通知する(というイメージ)
if (e == GameEvent.PlayerJoined)
{
_subject.OnNext(GameEvent.PlayerAddedToTeam);
}
}).RegisterTo(destroyCancellationToken);
}
// 購読者B
// イベント通知をログに出す
private void SubscriberB(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"B: {e}");
}).RegisterTo(destroyCancellationToken);
}
// 購読者C
// イベント通知をログに出す
private void SubscriberC(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"C: {e}");
}).RegisterTo(destroyCancellationToken);
}
}
}
A: PlayerJoined
A: PlayerAddedToTeam
B: PlayerAddedToTeam
C: PlayerAddedToTeam
B: PlayerJoined
C: PlayerJoined
「SubscriberA」は意図したとおりの順序になっていますが、BとCでは後から発行したPlayerAddedToTeamが先に到達してしまいました。この順序の入れ替わりはAのSubscribe()内で同じObservableに対してメッセージ発行をしてしまった(再帰した)ために起きました。
この再帰時の順序の入れ替わりを防止し、「先に発行したメッセージは先に到達する」という状況(末尾再帰)に変換できるのがTrampolineオペレータです。(UniRxではObserveOn(Scheduler.CurrentThread)を使うことで順序の入れ替わりを防ぐことができた)
Trampolineで末尾再帰にする
Trampolineで末尾再帰にする
Trampoline()を挟むことで、再帰したときに末尾再帰な形へとメッセージ順序が調整されます。なおShare()はHot変換用のオペレータで、Trampolineを通した1本のストリームを全員で共有させるために必要となります。
参考資料 : 【Reactive Extensions】 Hot変換はどういう時に必要なのか?
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class TrampolineSample : MonoBehaviour
{
// ゲーム中で扱うイベント
private enum GameEvent
{
PlayerJoined,
PlayerAddedToTeam
}
// ゲームイベントを通知するためのSubject
private readonly Subject<GameEvent> _subject = new();
private void Start()
{
_subject.AddTo(this);
// Trampoline()で再帰したときに末尾再帰になる
// Share()はTrampoline()を通した同じストリームを全員で共有するために必要
var observable = _subject.Trampoline().Share();
// イベント通知をいろんな場所で購読して処理している(というイメージ)
SubscriberA(observable);
SubscriberB(observable);
SubscriberC(observable);
// 新しくプレイヤーが接続してきたことを通知する
_subject.OnNext(GameEvent.PlayerJoined);
}
// 購読者A
// PlayerJoinedイベントを受け取ったら、ゲームに参加させる処理を行う
private void SubscriberA(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"A: {e}");
// PlayerJoinedイベントを受け取ったら、ゲームに参加させる処理を行う
// その後、PlayerAddedToTeamイベントを通知する(というイメージ)
if (e == GameEvent.PlayerJoined)
{
_subject.OnNext(GameEvent.PlayerAddedToTeam);
}
}).RegisterTo(destroyCancellationToken);
}
// 購読者B
// イベント通知をログに出す
private void SubscriberB(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"B: {e}");
}).RegisterTo(destroyCancellationToken);
}
// 購読者C
// イベント通知をログに出す
private void SubscriberC(Observable<GameEvent> observable)
{
observable.Subscribe(e =>
{
// 受け取ったイベントをログに出す
Debug.Log($"C: {e}");
}).RegisterTo(destroyCancellationToken);
}
}
}
A: PlayerJoined
B: PlayerJoined
C: PlayerJoined
A: PlayerAddedToTeam
B: PlayerAddedToTeam
C: PlayerAddedToTeam
メッセージ順序が「先に発行した順」へと調整されました。
[変更] 例外のハンドリングが変更
UniRxのObservableでは例外発生時はOnErrorメッセージが発行されObservableは停止していました。
しかしR3では例外発生時の挙動が変更されており、発生した例外はOnErrorResumeメッセージとして扱われることになっています。このOnErrorResumeメッセージは「例外を通知するがObservableは停止せずにそのまま動作を継続する」という挙動をします。
using var subject = new Subject<string>();
// 文字列をintに変換する
subject
.Select(int.Parse) // パース失敗時に例外が発生する
.Subscribe(
x => Debug.Log(x),
ex => Debug.LogError($"OnErrorResume: {ex}"),
result => Debug.Log($"OnCompleted: {result}"));
subject.OnNext("123"); // 出力は OnNext(123)
subject.OnNext("xyz"); // 出力は OnErrorResume: System.FormatException
subject.OnCompleted(); // 出力は OnCompleted: Success
UniRxで必要だった「エラー発生時はRetry()などのオペレータを使ってObservableの再構築をする」という処理が不要となりました。
そのためR3ではRetry()オペレータは廃止されています。
OnErrorResumeAsFailure()で停止させる
もしOnErrorResumeメッセージ発生時に従来どおりObservableを停止させたい場合はOnErrorResumeAsFailure()オペレータを使いましょう。
subject
// パース失敗時に OnErrorResume が発行される
.Select(int.Parse)
// OnErrorResume を OnCompleted(Exception) に変換する
.OnErrorResumeAsFailure()
.Subscribe(
x => Debug.Log(x),
ex => Debug.LogError($"OnErrorResume: {ex}"),
result => Debug.Log($"OnCompleted: {result}"));
Catch()の挙動
例外発生時のハンドリングを行うことができるCatch()オペレータですが、こちらが発火する対象はOnCompleted(Exception)になっています。
// 文字列をintに変換する
subject
// パース失敗時に OnErrorResume が発行される
.Select(int.Parse)
// OnErrorResume を OnCompleted(Exception) に変換する
.OnErrorResumeAsFailure()
// CatchはOnCompleted(Exception)に反応する
.Catch<int, FormatException>(ex =>
{
Debug.LogError(ex);
// 例外発生時に差し替えるObservable
return Observable.Empty<int>();
})
.Subscribe(
x => Debug.Log(x),
ex => Debug.LogError($"OnErrorResume: {ex}"),
result => Debug.Log($"OnCompleted: {result}"));
subject.OnNext("123"); // 出力は OnNext(123)
subject.OnNext("xyz"); // 出力は OnCompleted: Success
subject.OnCompleted(); // 到達しない
[変更] ObservableのAwaiterが無い
UniRxではObservableを直接async/awaitで待つことができましたがR3ではなくなりました。
同等の機能を再現したい場合はLastAsync()を使ってください。
var result = await Observable.Range(1, 10).LastAsync();
[変更] First/FirstOrDefault/Last/LastOrDefaultが無い
オペレータであるFirst/FirstOrDefault/Last/LastOrDefault/Single/SingleOrDefaultがなくなりました。
代わりにそれぞれ-Asyncがついた、Task<T>化する機能が追加されています。
private async UniTaskVoid ExampleAsync(CancellationToken ct)
{
// 最後の値を待機する
var lastValue = await Observable.Range(1, 10).LastAsync(cancellationToken: ct);
// 最初の値を待機する
var firstValue = await Observable.Range(1, 10).FirstAsync(cancellationToken: ct);
}
もしオペレータとしての動作を期待する場合はTake(1)/TakeLast(1)を使いましょう。
// Firstとだいたい一緒(ただしEmptyの時にエラーにはならない)
Observable.Range(1, 10)
.Take(1)
.Subscribe(x => Debug.Log(x));
// Lastとだいたい一緒(ただしEmptyの時にエラーにはならない)
Observable.Range(1, 10)
.TakeLast(1)
.Subscribe(x => Debug.Log(x));
// FirstOrDefaultとだいたい一緒
Observable.Empty<int>()
.Take(1).DefaultIfEmpty()
.Subscribe(x => Debug.Log(x));
// LastOrDefaultとだいたい一緒
Observable.Empty<int>()
.TakeLast(1).DefaultIfEmpty()
.Subscribe(x => Debug.Log(x));
// オペレータとしてのSingleは再現できないかも
(余談ですが、-Asyncなメソッドは他にも結構追加されています。MaxAsync()とかAllAsync()とかContainsAsync()とか)
[変更] BehaviorSubject/AsyncSubjectが無い
BehaviorSubjectとAsyncSubjectはR3では実装されていません。
BehaviorSubjectは挙動がReactivePropertyとほぼ同じなため、ReactivePropertyを代替として使いましょう。
AsyncSubjectについてはUniTaskCompletionSourceがほぼ同じ挙動をするのでUniTaskを導入してこちらを使うとよいかも。
using System;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class AlternativeAsyncSubject : MonoBehaviour
{
private void Start()
{
// UniTaskCompletionSourceをAsyncSubjectの代わりに使ってみる
var utcs = new UniTaskCompletionSource<int>();
// OnNext(100) + OnCompleted() と同じ
utcs.TrySetResult(100);
// OnError(new Exception()) と同じ
utcs.TrySetException(new Exception());
// OnError(new OperationCanceledException()) とだいたい同じ
utcs.TrySetCanceled();
// -----------------------------------------------
// ラムダ式で待ち受け。ただしこれだとキャンセルができない
utcs.Task.ContinueWith(result => Debug.Log(result));
// async/awaitで待ち受け
// AttachExternalCancellationでキャンセルを外付けしておくとよい
UniTask.Void(async () =>
{
var result = await utcs.Task.AttachExternalCancellation(destroyCancellationToken);
Debug.Log(result);
});
// Observableとして扱いたいなら一度ValueTaskを介して変換する
// UniTask -> ValueTask -> R3.Observable
utcs.Task
.AsValueTask()
.ToObservable()
.Subscribe(x => Debug.Log(x))
.AddTo(this);
}
}
}
UniTask -> R3.Observable の変換が頻発するなら拡張メソッドを用意してもいいかも。
UniTask->R3.Observable変換の拡張メソッド例
using Cysharp.Threading.Tasks;
using R3;
namespace UniTaskR3ExtensionsSample
{
public static class UniTaskR3Extensions
{
public static Observable<T> ToR3Observable<T>(this UniTask<T> task)
{
return task.AsValueTask().ToObservable();
}
public static Observable<Unit> ToR3Observable(this UniTask task)
{
return task.AsValueTask().ToObservable();
}
public static Observable<T> ToR3Observable<T>(this UniTaskCompletionSource<T> tcs)
{
return tcs.Task.ToR3Observable();
}
public static Observable<Unit> ToR3Observable(this UniTaskCompletionSource tcs)
{
return tcs.Task.ToR3Observable();
}
public static Observable<T> ToR3Observable<T>(this AutoResetUniTaskCompletionSource<T> tcs)
{
return tcs.Task.ToR3Observable();
}
public static Observable<Unit> ToR3Observable(this AutoResetUniTaskCompletionSource tcs)
{
return tcs.Task.ToR3Observable();
}
}
}
var utcs = new UniTaskCompletionSource<int>();
utcs.ToR3Observable()
.Subscribe(x => Debug.Log(x))
.AddTo(this);
[変更] IReactiveProperty<T>/IReadOnlyReactiveProperty<T>インタフェースが無い
ReactiveProperty<T>のインタフェース定義であるIReactiveProperty<T>/IReadOnlyReactiveProperty<T>がR3では存在しません。
もし読み取り専用でReactiveProperty<T>を公開したいときは、IReadOnlyReactiveProperty<T>インタフェースの代わりにReadOnlyReactiveProperty<T>にキャストして公開しましょう。
using R3;
namespace Samples.R3Sample
{
public class ReactivePropertySample
{
// こっちが本体
private readonly ReactiveProperty<int> _rp = new();
// 公開するプロパティ
public ReadOnlyReactiveProperty<int> Value => _rp;
}
}
[変更] ReactiveCollection/ReactiveDictionaryが無い
かなり便利だったReactiveCollection<T>およびReactiveDictionary<TKey, TValue>はR3に未実装です(v0.1.23現在)。その代わり、他にもリアクティブなコレクション構造が追加されているObservableCollectionsという別のライブラリがあるのでこちらを使いましょう。むしろObservableCollectionsの方がリアクティブなコレクション実装が増えていて便利です。
なお、R3と組み合わせるためにはObservableCollectionsとObservableCollections.R3の2つのパッケージを導入する必要があります。NugetForUnityから導入してください。
using ObservableCollections;
using UnityEngine;
using R3;
namespace Samples.R3Sample
{
public class AlternativeReactiveDictionary : MonoBehaviour
{
private void Start()
{
// ReactiveDictionaryの代わり
var dic = new ObservableDictionary<int, string>();
// 新しい要素が追加されたイベント
dic.ObserveAdd(destroyCancellationToken)
.Subscribe(collectionAddEvent =>
{
var (key, value) = collectionAddEvent.Value;
Debug.Log($"Add [{key}]={value}");
});
// 要素が上書きされたイベント
dic.ObserveReplace(destroyCancellationToken)
.Subscribe(replaceEvent =>
{
var key = replaceEvent.NewValue.Key;
var newValue = replaceEvent.NewValue.Value;
var oldValue = replaceEvent.OldValue.Value;
Debug.Log($"Replace [{key}]={oldValue} -> {newValue}");
});
dic[1] = "hoge";
dic[2] = "fuga";
dic[1] = "piyo";
}
}
}
Add [1]=hoge
Add [2]=fuga
Replace [1]=hoge -> piyo
[変更] コルーチンとの連携機能が無い
UniRxではObservable.FromCoroutineやCoroutine.ToObservableなど、Unityコルーチンと連携した機能がありました。
ですがこれらの機能はR3ではオミットされています。
その代わりasync/awaitとの連携がUniRxと比べ強化されているのでコルーチンの代わりに「UniTask + async/await」を使いましょう。
// async/awaitを用いて手続き的に値を発行するObservableを作れる
Observable.Create<int>(async (observer, ct) =>
{
observer.OnNext(1);
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
observer.OnNext(2);
await UniTask.Delay(TimeSpan.FromSeconds(1), cancellationToken: ct);
observer.OnNext(3);
observer.OnCompleted();
})
.Subscribe(x => Debug.Log(x))
.RegisterTo(cancellationToken);
[変更] AsyncReactiveCommandが無い
UniRxにはAsyncReactiveCommandというUIと組み合わせると便利な機能がありましたが、R3では消滅しています。
(またToReactiveCommandも消えているが、こっちは大した実装じゃないのでUniRxを参考に自前で拡張メソッドを実装しちゃえばよさそう)
AsyncReactiveCommandはUIの制御と組み合わせ、「非同期処理が実行中は関連するボタン制御を無効化する」といった用途に使うことができました。
これをAsyncReactiveCommand無しで再現するならSubscribeAwaitを使うとよいかと。
AsyncReactiveCommandでのUI制御をSubscribeAwaitで代替してみる
AsyncReactiveCommandでのUI制御をSubscribeAwaitで代替してみる
SubscribeAwaitでReactiveProperty<bool>を制御し、同時に複数のボタンのInteractableを切り替えることでAsyncReactiveCommandと似た挙動を再現できます。
using System;
using System.Threading;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;
using Random = UnityEngine.Random;
namespace Samples.R3Sample
{
public class AlternativeAsyncReactiveCommandSample : MonoBehaviour
{
// 各種ボタン
[SerializeField] private Button _buttonA;
[SerializeField] private Button _buttonB;
[SerializeField] private Button _buttonC;
// 状態表示用のテキスト
[SerializeField] private Text _text;
// ボタン制御用のReactiveProperty(ゲート)
private readonly ReactiveProperty<bool> _gate = new(true);
private void Start()
{
_gate.AddTo(this);
// ゲートがfalseのときはボタンを押せない状態にする
_gate.SubscribeToInteractable(_buttonA).RegisterTo(destroyCancellationToken);
_gate.SubscribeToInteractable(_buttonB).RegisterTo(destroyCancellationToken);
_gate.SubscribeToInteractable(_buttonC).RegisterTo(destroyCancellationToken);
// 状態を可視化する
_gate.Select(x => $"Gate={x}").SubscribeToText(_text);
// ボタンAが押されたときの処理
_buttonA.OnClickAsObservable()
.Where(_ => _gate.Value)
.SubscribeAwait(async (_, ct) =>
{
// 非同期処理にゲートを連動させる
await GateControlAsync(MethodAAsync(ct));
}, AwaitOperation.Drop)
.RegisterTo(destroyCancellationToken);
// ボタンBが押されたときの処理
_buttonB.OnClickAsObservable()
.Where(_ => _gate.Value)
.SubscribeAwait(async (_, ct) =>
{
// 非同期処理にゲートを連動させる
await GateControlAsync(MethodBAsync(ct));
}, AwaitOperation.Drop)
.RegisterTo(destroyCancellationToken);
// ボタンCが押されたときの処理
_buttonC.OnClickAsObservable()
.Where(_ => _gate.Value)
.SubscribeAwait(async (_, ct) =>
{
// 非同期処理にゲートを連動させる
await GateControlAsync(MethodCAsync (ct));
}, AwaitOperation.Drop)
.RegisterTo(destroyCancellationToken);
}
// 非同期処理実行中はゲートを閉める
private async UniTask GateControlAsync(UniTask task)
{
_gate.Value = false;
try
{
await task;
}
finally
{
_gate.Value = true;
}
}
// なにか各種非同期な処理があったとする
private async UniTask MethodAAsync(CancellationToken ct)
{
await UniTask.Delay(TimeSpan.FromSeconds(Random.Range(0.5f, 2f)), cancellationToken: ct);
}
private async UniTask MethodBAsync(CancellationToken ct)
{
await UniTask.Delay(TimeSpan.FromSeconds(Random.Range(0.5f, 2f)), cancellationToken: ct);
}
private async UniTask MethodCAsync(CancellationToken ct)
{
await UniTask.Delay(TimeSpan.FromSeconds(Random.Range(0.5f, 2f)), cancellationToken: ct);
}
}
}
[変更] MessageBrokerが無い
インメモリpub/subのであるMessageBrokerがUniRxには存在しましたがR3ではオミットされています。
代替としては、これの上位互換的な存在であるMessagePipeを使うとよいでしょう。
なお、この記事の執筆時点では MessagePipe -> R3.Observable に直接変換するメソッドは存在していませんでした。将来的に実装される可能性があります。一応、もしR3.Observableに変換したい場合はSystem.IObservableへの変換を一度挟むことで実現できます。
private void Sample<T>(ISubscriber<T> subscriber)
{
subscriber // MessagePipe.ISubscriber<T>
.AsObservable() // System.IObservable<T>
.ToObservable() // R3.Observable<T>
.Subscribe();
}
ただこのクッションが気持ち悪いという人は拡張メソッドを用意するとよさそう。
シンプルな拡張メソッドの例
using MessagePipe;
using R3;
namespace My.MessagePipe.R3.Ext
{
public static class MessagePipeR3Ext
{
public static Observable<TMessage> AsR3Observable<TMessage>(this ISubscriber<TMessage> subscriber,
params MessageHandlerFilter<TMessage>[] filters)
{
return subscriber.AsObservable(filters).ToObservable();
}
public static Observable<TMessage> AsR3Observable<TMessage>(this IBufferedSubscriber<TMessage> subscriber,
params MessageHandlerFilter<TMessage>[] filters)
{
return subscriber.AsObservable(filters).ToObservable();
}
public static Observable<TMessage> AsR3Observable<TKey, TMessage>(this ISubscriber<TKey, TMessage> subscriber,
TKey key,
params MessageHandlerFilter<TMessage>[] filters)
{
return subscriber.AsObservable(key, filters).ToObservable();
}
}
}
ただしこの書き方だと2回変換が挟まってちょっと無駄なので、そこを気にするならMessagePipeのObservable変換コードをR3用に書き直したものを用意するでもいいかも。
MessagePipeから直接R3に変換する拡張メソッドの例
// MessagePipe
// MIT License
// Copyright (c) 2021 Cysharp, Inc.
// https://github.com/Cysharp/MessagePipe/blob/master/LICENSE
using System;
using MessagePipe;
using R3;
namespace My.MessagePipe.R3.Ext
{
public static class MessagePipeR3Ext
{
public static Observable<TMessage> AsR3Observable<TMessage>(this ISubscriber<TMessage> subscriber,
params MessageHandlerFilter<TMessage>[] filters)
{
return new ObservableSubscriber<TMessage>(subscriber, filters);
}
public static Observable<TMessage> AsR3Observable<TMessage>(this IBufferedSubscriber<TMessage> subscriber,
params MessageHandlerFilter<TMessage>[] filters)
{
return new ObservableBufferedSubscriber<TMessage>(subscriber, filters);
}
public static Observable<TMessage> AsR3Observable<TKey, TMessage>(this ISubscriber<TKey, TMessage> subscriber,
TKey key,
params MessageHandlerFilter<TMessage>[] filters)
{
return new ObservableSubscriber<TKey, TMessage>(key, subscriber, filters);
}
}
internal sealed class ObservableSubscriber<TKey, TMessage> : Observable<TMessage>
{
readonly TKey key;
readonly ISubscriber<TKey, TMessage> subscriber;
readonly MessageHandlerFilter<TMessage>[] filters;
public ObservableSubscriber(TKey key,
ISubscriber<TKey, TMessage> subscriber,
MessageHandlerFilter<TMessage>[] filters)
{
this.key = key;
this.subscriber = subscriber;
this.filters = filters;
}
protected override IDisposable SubscribeCore(Observer<TMessage> observer)
{
return subscriber.Subscribe(key, new ObserverMessageHandler<TMessage>(observer), filters);
}
}
internal sealed class ObservableSubscriber<TMessage> : Observable<TMessage>
{
readonly ISubscriber<TMessage> subscriber;
readonly MessageHandlerFilter<TMessage>[] filters;
public ObservableSubscriber(ISubscriber<TMessage> subscriber, MessageHandlerFilter<TMessage>[] filters)
{
this.subscriber = subscriber;
this.filters = filters;
}
protected override IDisposable SubscribeCore(Observer<TMessage> observer)
{
return subscriber.Subscribe(new ObserverMessageHandler<TMessage>(observer), filters);
}
}
internal sealed class ObservableBufferedSubscriber<TMessage> : Observable<TMessage>
{
readonly IBufferedSubscriber<TMessage> subscriber;
readonly MessageHandlerFilter<TMessage>[] filters;
public ObservableBufferedSubscriber(IBufferedSubscriber<TMessage> subscriber,
MessageHandlerFilter<TMessage>[] filters)
{
this.subscriber = subscriber;
this.filters = filters;
}
protected override IDisposable SubscribeCore(Observer<TMessage> observer)
{
return subscriber.Subscribe(new ObserverMessageHandler<TMessage>(observer), filters);
}
}
internal sealed class ObserverMessageHandler<TMessage> : IMessageHandler<TMessage>
{
readonly Observer<TMessage> observer;
public ObserverMessageHandler(Observer<TMessage> observer)
{
this.observer = observer;
}
public void Handle(TMessage message)
{
observer.OnNext(message);
}
}
}
[変更] UniRx.ToolKit.ObjectPoolが無い
UniRxにはオブジェクトプールの機構がありましたが、R3にはありません。
代替としては、「Unity公式のObjectPool」や「uPools」などが挙げられます。
[変更] Debugオペレータ
UniRxにはDebugというオペレータがあり、Observableのメッセージを表示してくれる便利機能がありました。ただR3には存在していないので、必要ならば拡張メソッドとして自分で用意しておくと便利です。
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public static class ObservableDebugExt
{
public static Observable<T> Debug<T>(this Observable<T> source, string label = null)
{
#if DEBUG
var l = (label == null) ? "" : $"[{label}] ";
return source.Materialize()
.Do(
onNext: x => UnityEngine.Debug.Log(l + x),
onDispose: () => UnityEngine.Debug.Log($"{l}OnDispose"),
onSubscribe: () => UnityEngine.Debug.Log($"{l}OnSubscribe")
)
.Dematerialize();
#else
return source;
#endif
}
public static Observable<T> Debug<T>(this Observable<T> source, ILogger logger)
{
#if DEBUG
return source.Materialize()
.Do(
onNext: x => logger.Log(x.ToString()),
onDispose: () => logger.Log("OnDispose"),
onSubscribe: () => logger.Log("OnSubscribe")
)
.Dematerialize();
#else
return source;
#endif
}
}
}
Observable.Range(0, 3)
.Debug("Observable.Range")
.Subscribe();
[Observable.Range] OnSubscribe
[Observable.Range] 0
[Observable.Range] 1
[Observable.Range] 2
[Observable.Range] Success
[Observable.Range] OnDispose
その他の細かい変更点
Select/Where/Subscribe/Doで外部変数を渡せるようになった
Select/Where/Subscribeで外部変数をObservable構築時に渡せることができるようになりました。これによってラムダ式から外部変数のキャプチャによるGCアロケートを避けることができます。
using R3;
using UnityEngine;
namespace Samples
{
public class StateSample : MonoBehaviour
{
[SerializeField] private float _fallThreshold = -10;
private void Start()
{
Observable.EveryValueChanged(transform, t => t.position, destroyCancellationToken)
// ラムダ式から外部変数をキャプチャさせない
.Where(_fallThreshold, (position, threshold) => position.y < threshold)
.Subscribe(_ => Destroy(gameObject));
}
}
}
Observable.EveryUpdateがキャンセル可能に
UniRxにもあったObservable.EveryUpdateですが、CancellationTokenの指定が可能となりました。(あとUnityFrameProviderでタイミングの指定もできます)
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class EveryUpdateSample : MonoBehaviour
{
private void Start()
{
// このGameObjectに寿命が連動したObservableになる
// 実質 this.UpdateAsObservable()
Observable
.EveryUpdate(destroyCancellationToken)
.Subscribe(_ => OnUpdate());
}
private void OnUpdate()
{
}
}
}
EveryValueChangedの記法変更
指定したオブジェクトを毎フレーム差分チェックし、変化があったら通知するという機能であったEveryValueChangedですが、記法が変わりました。
using R3;
using UnityEngine;
namespace Samples.R3Sample
{
public class EveryValueChangedSample : MonoBehaviour
{
private void Start()
{
// UniRxではこうだったが
// transform.ObserveEveryValueChanged(t => t.position).Subscribe(p => Debug.Log(p));
// R3での記法はこうなった
Observable
.EveryValueChanged(transform, t => t.position)
.Subscribe(p => Debug.Log(p));
// CancellationTokenを渡すこともできる
Observable
.EveryValueChanged(transform, t => t.rotation, destroyCancellationToken)
.Subscribe(r => Debug.Log(r));
}
}
}
ReactivePropertyにIEqualityComparerが指定可能に
地味な変更点ですが、ReactiveProperty<T>のコンストラクタでIEqualityComparer<T>が指定できるようになりました。
UniRxでは同値判定のカスタマイズができなかったのでこの変更は個人的には嬉しいです。
ReactivePropertyのSkipLatestValueOnSubscribe がなくなった
ReactiveProperty<T>を最初にSubscribeしたときの値の発行をスキップするSkipLatestValueOnSubscribe()がなくなりました。
といっても処理としてはSkip(1)を挟むだけなので、必要ならSkip(1)を使いましょう。
またSetValueAndForceNotify()もなくなっていますが、こちらはOnNext()で代替できます。
まとめ
R3とUniRxは似ているものの根本の思想が異なるライブラリであるため、かなり多くの差分が存在します。
そのため、すべてをまとめきれてはいないとは思いますがご容赦ください。
以下ポエム
個人的にR3のどこが気に入ったか
SubscribeAwait-
FakeTimeProvider/FakeFrameProviderによるユニットテスト Observable Tracker
UniRxでできなかったこと/問題になっていたことを解消しているので、このあたりはだいぶ気に入りました。
UniRxからR3に乗り換えるべきかどうか
個人的な意見としては、「可能であるならUniRxからR3に乗り換えるべき」だと考えます。
というのもR3の方が最新C#の機能を取り込み、機能としてもパフォーマンスとしてもかなり洗練されたライブラリに仕上がっています。
UniRxも決して悪いライブラリではないのですが、「R3かUniRxかどっちを取るか」と言われたら迷うこと無く「R3」です。
ただしR3はフル機能を活かすならUnity 2022.2以降である必要があるため、プロジェクトによってはそこが障害となる可能性があります(その場合はUniRxを継続して利用し続けるしかないです)
Unityバージョンの問題をクリアしたとしても、UniRxからR3は挙動が大きく異なっている部分(とくにエラーハンドリング周り)があるため置き換えるのは大変かもしれません。そのため「現状UniRxで特に困ってない」というのであれば無理に載せ替えないのも選択肢としてはありだと思います。
UniTaskとasync/awaitがすでにあって便利なのに、R3を導入するメリットはあるのか
メリットはあります。
そもそも「UniTask + async/await」と「R3」は扱う領域が異なります。
Rxは「PUSH型のインメモリなメッセージング機構」「LINQ to Events」としての機能が非常に強力であり、async/awaitでは代替不可能です。
PUSH型のイベントメッセージングが必要な場面では「R3」を使い、非同期処理やPULL型の制御を行う場面では「UniTask, async/await」を使う、といった様にこの2つは併用することが推奨されると考えます。むしろR3はasync/awaitとの併用を前提に作られているので、組み合わせて使いましょう!









