今回の内容
C#向けの新しいReactiveExtensions(Rx)ライブラリ「R3」について、従来のRxとの変更点を紹介します。
R3について
リポジトリ
開発元はUniTask
やMagicOnion
などを公開しているCysharp社であり、メインの開発者はUniRx
の作者でもあるneuecc氏です。
どんなライブラリか
「R3」はReactiveExtensions(Rx)を現代に合わせてより洗練した形に再定義/再実装した、C#用のライブラリです。
というのも、Rxは10年以上前に登場した概念であり、当時はまだ非同期処理に対するベストプラクティスが模索されていたタイミングでした。
そのためRxは「LINQをEventにも適用できる」という側面も持ちつつ「非同期処理にも用いることができる」という2つの側面をもった、少しややこしいライブラリとして登場しました。
そして2024年現在においては、async/await
が非同期処理のベストプラクティスとされており、Rxをその用途に使うことはほぼ無い状況となっています。
ではRxは完全に廃れたのかというと、そうではありません。「LINQをEventにも適用できる(LINQ to Events
)」という概念、PUSH型のメッセージング機構としてRxを捉えると有用性は現代でも存在します。
そのためRxライクな概念(Observable
的なもの)は言語を超えて今でも利用されており、UniRxも人気の高いライブラリとして君臨しています。
しかしRxという概念とasync/await
は近い時代に産まれたものの、お互いに調和が取れた形にはなっていません。
そのため「Rxとasync/await
を両立できればいいのに…」と感じる場面が多くありました。
そして今回紹介する「R3」はそのRxの原点である「LINQ to Events
」という概念を現代に合わせて再考慮し、かつasync/await
との調和も取れたメッセージングライブラリとして作られています。
またRxと比較してパフォーマンスが出るようにチューニングされており、かつC#の最新機能を多く取り入れています。
C#における汎用ライブラリとしての定義
Rx(dotnet/reactive
)はC#向けのライブラリではありますが、C#を用いているフレームワーク(Unityなど)でそのまま使えるわけではありませんでした。
各フレームワーク向けのチューニングがされておらず、パフォーマンス面で懸念があったり、そもそも導入しても上手く活用できませんでした。
(そのため、Unity向けのRx実装として「UniRx」が専用で作られるなどしていた)
一方、R3は本体である「コアモジュール」と、各フレームワークでR3を動作可能にする「拡張モジュール」から構成されています。
コアモジュールはC#の環境で汎用に動くように(しかも徹底的にチューニングして)作られており、各フレームワークごとの都合は拡張モジュールで吸収するという仕組みになっています。
これにより利用者はどんなフレームワークであったとしても、この強力な次世代RxであるR3を利用可能になりました。
対応フレームワーク
- Unity
- Godot
- Avalonia
- WPF
- WinForms
- Stride,
(2024年2月時点)
従来のRx(dotnet/reactiveやUniRx等)との違い
Observableが再定義されている
R3ではObservable
の概念が再定義されています。
(Observable
はRxの中核である、PUSH型メッセージングを扱う機構/オブジェクト)
Rxで用いられていたObservable
は使いにくい場面や、そもそもパフォーマンスが出せない仕組みになっていました。
R3ではこの既存のObservable
の概念を破壊的に変更することにより、利便性とパフォーマンスを向上させています。
メソッド定義の変更
まず既存のRxでは、Observable
はSystem
定義のインタフェースに依存していました。
namespace System;
public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
一方でR3はこのSystem定義のインタフェースに依存していません。
R3が新たに定義したObservable
(抽象クラス)を用いており、メソッド定義や挙動が変更されています。
全体を抜粋するとややこしいので、public
メソッドのみを抽出するとR3でのObservable
/Observer
は次の定義となっています。
namespace R3;
// 必要な部分だけ取り出しているので、C#の文法としては正しくないです
// 重要なのはどういうメソッド定義がされているか
public abstract class Observable<T>
{
public IDisposable Subscribe(Observer<T> observer);
}
public abstract class Observer<T> : IDisposable
{
public void OnNext(T value);
public void OnErrorResume(Exception error);
public void OnCompleted(Result result);
}
(引用元)
挙動がどう変わったか
// Rx
public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
// R3
public abstract class Observer<T> : IDisposable
{
public void OnNext(T value);
public void OnErrorResume(Exception error);
public void OnCompleted(Result result);
}
Observer
の定義を比較すると、OnError
とOnCompleted
の挙動が変更されています。
既存のRxでは「例外」が発生したときはOnError
メッセージが発行されていました。そしてこのOnError
メッセージが発行されたとき、「Observable
は動作を停止する」というルールになっていました。
しかし、例外が出ただけでイベント購読が止まってしまっては困る場面が多々あります。例外が出て止まってしまった場合、再開するには再度Observable
を再構築してインスタンス化する必要があります。これはパフォーマンス面でも難がありました。
その挙動がR3では次のように変更されています。
- 例外の通知ができた上で
Observable
を停止させない「OnErrorResume()
」が定義された - もし例外発行して停止したい場合は「
OnCompleted()
」で通知できる
純粋に「例外が起きた」ということを通知し、Observable
の寿命には影響を与えないOnErrorResume()
が追加されました。
そして既存のOnError()
とOnCompleted()
は統合され、OnCompleted()
発行時に停止理由を送ることが可能となりました。
対応表を作ると、次のようになります。
機能 | Rx | R3 | 備考 |
---|---|---|---|
イベントメッセージの通知 | OnNext(T value) |
OnNext(T value) |
OnNext は変更なし |
正常系でObservable を止める |
OnCompleted() |
OnCompleted() |
R3ではResult を指定しない場合はRxと同じ挙動になる |
例外を通知する & Observable を止める |
OnError(Exception error) |
OnCompleted(Result result) |
R3ではResult 型に例外を詰めることでRxのOnError の代替となる |
例外を通知する(Observable は止めない) |
(できない) | OnErrorResume(Exception error) |
R3にのみ登場 |
従来のRxでは、「エラー発生時にObservable
が止まることを防ぐことはできないので、Operator
を使ってがんばってストリームを復旧する」という方法しか取ることができませんでした。
一方のR3では逆に「エラー時は基本停止しないが、Operator
を使えば停止させることもできる」という方法になっています。
これはパフォーマンスの面でも、制御のしやすさの面でも、R3の挙動のほうが優れています。
すべてのObservableは「完了」する
また大きな挙動の違いとして、Observable
をDispose()
したときはOnCompleted
メッセージが発行されるように変更されています。
そのためSubject
のDispose()
時や、Observable.FromEvent
などにおいてもOnCompleted
メッセージの発行が行えるようになっています。
Schedulerの廃止
従来のRxにおいてObservable
の時間的な挙動を司る概念としてScheduler
というものがありました。
しかしこのScheduler
、性能面では微妙な部分が多かった上、扱い方によってはスレッドが止まってしまうなど、扱いに難がありました。
R3ではこのScheduler
は廃止され、かわりにTimeProvider
とFrameProvider
によって時間(非同期)の管理がされるようになりました。
これによってパフォーマンスが向上しただけでなく、UniRxの独自実装であった「フレーム」という単位で動作するOperator
が、UniRx外のC#が登場するあらゆる場面でも扱えるようになりました。
また同時にFakeTimeProvider
/FakeFrameProvider
も用意され、これらを用いることでObservable
のユニットテストが容易に書けるようになっています。
全体がasync/awaitと調和した挙動に
従来のRxはasync/await
の概念とは独立して考案され構築されたものでした。
そのため現在において非同期処理の主力であるasync/await
との接合は微妙であり、使い勝手はよくありませんでした。
一方でR3では「async/await
との調和」を意識して構築されています。
さまざまな場面において自然にasync/await
を差し込めたり、CancellationToken
による制御が可能となりました。
SubscribeやOperatorでasync/await
が使える
R3では一部のOperator
やSubscribe
をするときにasync/await
を併用することが可能になっています。
また非同期処理の実行中に次のメッセージが到達した場合の挙動はAwaitOperation
で変更することができます。
SubscribeAwait
SubscribeAwait
を用いることでSubscribe
時にasync/await
を利用できるようになりました。
また非同期処理の実行中に次のメッセージが到達した場合の挙動は、後述するAwaitOperation
により変更することができます。
private void SubscribeAwaitSample()
{
var subject = new R3.Subject<string>();
// Subscribe時にasync/awaitでメッセージをハンドリングできる
subject.SubscribeAwait(async (text, ct) =>
{
await Task.Delay(TimeSpan.FromSeconds(1), ct);
Console.WriteLine(text);
// await中に次のイベントが到達したときの挙動は変更可能
}, AwaitOperation.Sequential);
subject.OnNext("Hello!");
subject.OnNext("World!");
subject.OnCompleted();
}
Hello!
World!
SelectAwait/WhereAwait
Select
とWhere
に非同期版が用意されており、async/await
との併用が可能になっています。
こちらも後述するAwaitOperation
により挙動を変更することができます。
private static readonly HttpClient HttpClient = new HttpClient();
private void AwaitableSample(CancellationToken ct)
{
var subject = new R3.Subject<string>();
subject
.Select(x => new Uri(x))
// 要素が存在するかの事前チェック
// 並列度を指定して、最大2つまで同時に問い合わせる
.WhereAwait(HeadAsync, AwaitOperation.Parallel, maxConcurrent: 2)
// データのダウンロード
// 並列での実行を許可しない
.SelectAwait(DownloadAsync, AwaitOperation.Sequential)
.Subscribe(onNext: result =>
{
// ダウンロード結果の表示
Debug.Log(result.Length);
});
// 適当なリソースをダウンロード
subject.OnNext("https://media.githubusercontent.com/media/TORISOUP/SequentialTaskExecutors/master/DemoResources/1.jpg");
subject.OnNext("https://media.githubusercontent.com/media/TORISOUP/SequentialTaskExecutors/master/DemoResources/2.jpg");
subject.OnNext("https://media.githubusercontent.com/media/TORISOUP/SequentialTaskExecutors/master/DemoResources/3.jpg");
subject.OnCompleted();
}
// 対象のURIをHEADしてみて要素が存在するか事前チェックする
private async ValueTask<bool> HeadAsync(Uri path, CancellationToken ct)
{
using var request = new HttpRequestMessage(new HttpMethod("HEAD"), path);
using var response = await HttpClient.SendAsync(request, ct);
return response.IsSuccessStatusCode;
}
// 対象のデータダウンロード
private async ValueTask<byte[]> DownloadAsync(Uri uri, CancellationToken ct)
{
using var response = await HttpClient.GetAsync(uri, ct);
return await response.Content.ReadAsByteArrayAsync();
}
非同期処理の挙動の調整:AwaitOperation
SubscribeAwait
/WhereAwait
/SelectAwait
はAwaitOperation
を指定することができます。
このAwaitOperation
により、非同期処理の実行中(await
の処理が終わる前)に次のメッセージが到達してしまったときの挙動を調整することができます。
AwaitOperation |
await 中に次のイベントが来たときの挙動 |
備考 |
---|---|---|
Sequential | 今実行中の処理を優先。余剰なイベントはキューに積む。非同期処理が終わり次第、次の1つを取り出して順番に非同期実行する。 | |
Drop | 今実行中の処理を優先。余剰なイベントは無視してなかったことにする。 | |
Switch | 今実行中の非同期処理をキャンセル。 新しく到達したイベントの処理を優先して開始する。 | キャンセル処理はCancellationToken を使って自分で実装する必要がある。 |
Parallel | 新しく来たイベントを即座に処理する。処理が終わったものから早いもの勝ちで出力される。 |
maxConcurrent で同時実行数を制限できる。maxConcurrent を超える数のメッセージはキューに積まれる。 |
SequentialParallel※ | 新しく来たイベントを即座に処理する。処理の終了順によらず、出力順が入力順と同じになるように順序調整される。 |
maxConcurrent で同時実行数を制限できる。maxConcurrent を超える数のメッセージはキューに積まれる。 |
ThrottleFirstLast | 非同期処理が実行されていないとき、新しく届いた値を処理する。非同期処理の実行中は最新の値を1つだけ保持し、非同期処理の終了時にそれを取り出して処理を行う。 |
ThrottleFirst とThrottleLast が合体した挙動 |
※ SequentialParallelはWhereAwait/SelectAwaitでのみ利用可
Debounce/ThrottleFirst/ThrottleLast
Debounce
/ThrottleFirst
/ThrottleLast
もasync/await
に対応しています。
using System;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;
namespace Scenes
{
public class ThrottleFirstSample : MonoBehaviour
{
[SerializeField] private Button _button;
private float _waitTime = 1.0f;
private void Start()
{
// ボタンがクリックされたら一定時間ボタンイベントを無視する
// 無視する期間は実行することに伸びる
_button.OnClickAsObservable()
.ThrottleFirst(async (_, ct) =>
{
await UniTask.Delay(TimeSpan.FromSeconds(_waitTime), cancellationToken: ct);
})
.Subscribe(_ =>
{
Debug.Log("Clicked");
_waitTime += 1.0f;
});
}
}
}
async/awaitとObservableの相互変換
async/await => Observable
// Task, ValueTask, UniTaskをObservableに変換する
private void TaskToObservable()
{
// CancellationTokenを指定せずに単に変換するとき
var observable1 = SampleAsync(default).ToObservable();
// Observableの寿命に紐づいたCancellationTokenを取得して、それをasyncメソッドに渡すこともできる
var observable2 = Observable.FromAsync(SampleAsync);
}
private async ValueTask<int> SampleAsync(CancellationToken token)
{
await Task.Delay(1000, token);
return 1;
}
Observable.Create
ファクトリメソッドであるObservable.Create
でもasync/await
を使うことができます。
Observable.Create<int>(async (observer, ct) =>
{
await UniTask.Delay(1000, cancellationToken: ct);
observer.OnNext(1);
await UniTask.Delay(1000, cancellationToken: ct);
observer.OnNext(2);
await UniTask.Delay(1000, cancellationToken: ct);
observer.OnNext(3);
observer.OnCompleted();
}).Subscribe(Console.WriteLine);
1
2
3
Observable => async/await
// Observableをasync/awaitで待つ
// (この辺はRxと大差なし)
private async ValueTask ObservableToTask(CancellationToken ct)
{
// 最初の一個を待つ
var result1 = await Observable.Return(1).FirstAsync(ct);
// 最後の1個を待つ
var result2 = await Observable.Range(0, 10).LastAsync(ct);
}
UniRxを使っていた人は挙動が変わっていることに注意してください。
(従来のFirst
は無いので、Take(1)
で代用してください)
CancellationTokenで停止できるように
各種ファクトリメソッドや、Dispose()
をCancellationToken
に紐付けることができるようになりました。
private void CancelSample<T>(Action<int> sampleEvent, Vector3 vector3, CancellationToken ct)
{
// CancellationTokenでいろいろ停止可能に
Observable.Timer(TimeSpan.FromSeconds(10), ct).Subscribe();
Observable.FromEvent<int>(h => sampleEvent += h, h => sampleEvent -= h, ct).Subscribe();
Observable.Range(0, 10, ct).Subscribe();
Observable.Repeat(1, 10, ct).Subscribe();
Observable.EveryValueChanged(vector3, v => v.y, ct).Subscribe();
// RegisterToでIDisposableをCancellationTokenに登録できる
Observable.Repeat(1, 10, ct)
.Subscribe()
.RegisterTo(ct);
}
public class Vector3
{
public float x;
public float y;
public float z;
}
ReactivePropertyが同梱
ReactivePropertyと同等の機能がR3
に同梱されています。
(UniRxには最初からReactiveProperty
が同梱されていたので、UniRxユーザからすると変化はないです)
ただし定義(実装)は従来のものと変更されているため注意してください。
IReadOnlyReactiveProperty
インタフェースは廃止され、ISubject
を直接実装する形に変更されています。
それによりReactiveProperty
を読み取り専用にして公開する場合はReadOnlyReactiveProperty
(クラス)にキャストして公開する必要があります。
まとめ
R3はReactiveExtensionsの足りなかった部分/使いにくかった部分を解消しています。
また現在主流のasync/await
との調和も考えられており、パフォーマンスも大幅に改善しています。
個人的には今からRx(またはUniRx)を導入するよりは、こちらのR3を導入したほうが良いと考えます。
既にRx(またはUniRx)が導入されているプロジェクトの場合は、R3への置き換えは単純置換では済まないため注意が必要です。
(特にOnError周りの扱いが変更されているため、Observable
のエラーハンドリングをガチガチに組んでる場合は全部組み直しになります)
今後の予定
UniRxとの比較記事を書きます → 書きました