LoginSignup
109
81

次世代Rx「R3」解説

Last updated at Posted at 2024-02-07

今回の内容

C#向けの新しいReactiveExtensions(Rx)ライブラリ「R3」について、従来のRxとの変更点を紹介します。

R3について

リポジトリ

開発元はUniTaskMagicOnionなどを公開しているCysharp社であり、メインの開発者はUniRxの作者でもあるneuecc氏です。

どんなライブラリか

「R3」はReactiveExtensions(Rx)を現代に合わせてより洗練した形に再定義/再実装した、C#用のライブラリです。

というのも、Rxは10年以上前に登場した概念であり、当時はまだ非同期処理に対するベストプラクティスが模索されていたタイミングでした。
そのためRxは「LINQをEventにも適用できる」という側面も持ちつつ「非同期処理にも用いることができる」という2つの側面をもった、少しややこしいライブラリとして登場しました。
そして2024年現在においては、async/awaitが非同期処理のベストプラクティスとされており、Rxをその用途に使うことはほぼ無い状況となっています。

ではRxは完全に廃れたのかというと、そうではありません。「LINQをEventにも適用できる(LINQ to Events)」という概念、PUSH型のメッセージング機構としてRxを捉えると有用性は現代でも存在します。
そのためRxライクな概念(Observable的なもの)は言語を超えて今でも利用されており、UniRxも人気の高いライブラリとして君臨しています。

しかしRxという概念とasync/awaitは近い時代に産まれたものの、お互いに調和が取れた形にはなっていません。
そのため「Rxとasync/awaitを両立できればいいのに…」と感じる場面が多くありました。

そして今回紹介する「R3」はそのRxの原点である「LINQ to Events」という概念を現代に合わせて再考慮し、かつasync/awaitとの調和も取れたメッセージングライブラリとして作られています。
またRxと比較してパフォーマンスが出るようにチューニングされており、かつC#の最新機能を多く取り入れています。

C#における汎用ライブラリとしての定義

Rx(dotnet/reactive)はC#向けのライブラリではありますが、C#を用いているフレームワーク(Unityなど)でそのまま使えるわけではありませんでした。
各フレームワーク向けのチューニングがされておらず、パフォーマンス面で懸念があったり、そもそも導入しても上手く活用できませんでした。
(そのため、Unity向けのRx実装として「UniRx」が専用で作られるなどしていた)

一方、R3は本体である「コアモジュール」と、各フレームワークでR3を動作可能にする「拡張モジュール」から構成されています。
コアモジュールはC#の環境で汎用に動くように(しかも徹底的にチューニングして)作られており、各フレームワークごとの都合は拡張モジュールで吸収するという仕組みになっています。
これにより利用者はどんなフレームワークであったとしても、この強力な次世代RxであるR3を利用可能になりました。

対応フレームワーク

  • Unity
  • Godot
  • Avalonia
  • WPF
  • WinForms
  • Stride,

(2024年2月時点)

従来のRx(dotnet/reactiveやUniRx等)との違い

Observableが再定義されている

R3ではObservableの概念が再定義されています。
ObservableはRxの中核である、PUSH型メッセージングを扱う機構/オブジェクト)

Rxで用いられていたObservableは使いにくい場面や、そもそもパフォーマンスが出せない仕組みになっていました。
R3ではこの既存のObservableの概念を破壊的に変更することにより、利便性とパフォーマンスを向上させています。

メソッド定義の変更

まず既存のRxでは、ObservableSystem定義のインタフェースに依存していました。

従来のRxのIObserver/IObservable定義
namespace System;

public interface IObserver<in T>
{
    void OnNext(T value);
    void OnError(Exception error);
    void OnCompleted();
}

public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

一方でR3はこのSystem定義のインタフェースに依存していません。
R3が新たに定義したObservable(抽象クラス)を用いており、メソッド定義や挙動が変更されています。

全体を抜粋するとややこしいので、publicメソッドのみを抽出するとR3でのObservable/Observerは次の定義となっています。

R3におけるObserver/Observable
namespace R3;

// 必要な部分だけ取り出しているので、C#の文法としては正しくないです
// 重要なのはどういうメソッド定義がされているか
public abstract class Observable<T>
{
    public IDisposable Subscribe(Observer<T> observer);
}

public abstract class Observer<T> : IDisposable
{
    public void OnNext(T value);
    public void OnErrorResume(Exception error);
    public void OnCompleted(Result result);
}

引用元

挙動がどう変わったか

// Rx
public interface IObserver<in T>
{
    void OnNext(T value);
    void OnError(Exception error);
    void OnCompleted();
}

// R3
public abstract class Observer<T> : IDisposable
{
    public void OnNext(T value);
    public void OnErrorResume(Exception error);
    public void OnCompleted(Result result);
}

Observerの定義を比較すると、OnErrorOnCompletedの挙動が変更されています。

既存のRxでは「例外」が発生したときはOnErrorメッセージが発行されていました。そしてこのOnErrorメッセージが発行されたとき、「Observableは動作を停止する」というルールになっていました。

しかし、例外が出ただけでイベント購読が止まってしまっては困る場面が多々あります。例外が出て止まってしまった場合、再開するには再度Observableを再構築してインスタンス化する必要があります。これはパフォーマンス面でも難がありました。


その挙動がR3では次のように変更されています。

  • 例外の通知ができた上でObservableを停止させない「OnErrorResume()」が定義された
  • もし例外発行して停止したい場合は「OnCompleted()」で通知できる

純粋に「例外が起きた」ということを通知し、Observableの寿命には影響を与えないOnErrorResume()が追加されました。
そして既存のOnError()OnCompleted()は統合され、OnCompleted()発行時に停止理由を送ることが可能となりました。

対応表を作ると、次のようになります。

機能 Rx R3 備考
イベントメッセージの通知 OnNext(T value) OnNext(T value) OnNextは変更なし
正常系でObservableを止める OnCompleted() OnCompleted() R3ではResultを指定しない場合はRxと同じ挙動になる
例外を通知する & Observableを止める OnError(Exception error) OnCompleted(Result result) R3ではResult型に例外を詰めることでRxのOnErrorの代替となる
例外を通知する(Observableは止めない) (できない) OnErrorResume(Exception error) R3にのみ登場

従来のRxでは、「エラー発生時にObservableが止まることを防ぐことはできないので、Operatorを使ってがんばってストリームを復旧する」という方法しか取ることができませんでした。
一方のR3では逆に「エラー時は基本停止しないが、Operatorを使えば停止させることもできる」という方法になっています。
これはパフォーマンスの面でも、制御のしやすさの面でも、R3の挙動のほうが優れています。

すべてのObservableは「完了」する

また大きな挙動の違いとして、ObservableDispose()したときはOnCompletedメッセージが発行されるように変更されています。
そのためSubjectDispose()時や、Observable.FromEventなどにおいてもOnCompletedメッセージの発行が行えるようになっています。

Schedulerの廃止

従来のRxにおいてObservableの時間的な挙動を司る概念としてSchedulerというものがありました。
しかしこのScheduler、性能面では微妙な部分が多かった上、扱い方によってはスレッドが止まってしまうなど、扱いに難がありました。

R3ではこのSchedulerは廃止され、かわりにTimeProviderFrameProviderによって時間(非同期)の管理がされるようになりました。
これによってパフォーマンスが向上しただけでなく、UniRxの独自実装であった「フレーム」という単位で動作するOperatorが、UniRx外のC#が登場するあらゆる場面でも扱えるようになりました。

また同時にFakeTimeProvider/FakeFrameProviderも用意され、これらを用いることでObservableのユニットテストが容易に書けるようになっています。

全体がasync/awaitと調和した挙動に

従来のRxはasync/awaitの概念とは独立して考案され構築されたものでした。
そのため現在において非同期処理の主力であるasync/awaitとの接合は微妙であり、使い勝手はよくありませんでした。

一方でR3では「async/awaitとの調和」を意識して構築されています。
さまざまな場面において自然にasync/awaitを差し込めたり、CancellationTokenによる制御が可能となりました。

SubscribeやOperatorでasync/awaitが使える

R3では一部のOperatorSubscribeをするときにasync/awaitを併用することが可能になっています。
また非同期処理の実行中に次のメッセージが到達した場合の挙動はAwaitOperationで変更することができます。

SubscribeAwait

SubscribeAwaitを用いることでSubscribe時にasync/awaitを利用できるようになりました。
また非同期処理の実行中に次のメッセージが到達した場合の挙動は、後述するAwaitOperationにより変更することができます。

Subscribe時にasync/awaitを使える
private void SubscribeAwaitSample()
{
    var subject = new R3.Subject<string>();

    // Subscribe時にasync/awaitでメッセージをハンドリングできる
    subject.SubscribeAwait(async (text, ct) =>
    {
        await Task.Delay(TimeSpan.FromSeconds(1), ct);
        Console.WriteLine(text);
        
       // await中に次のイベントが到達したときの挙動は変更可能 
    }, AwaitOperation.Sequential);
    
    subject.OnNext("Hello!");
    subject.OnNext("World!");
    subject.OnCompleted();
}
Hello!
World!

SelectAwait/WhereAwait

SelectWhereに非同期版が用意されており、async/awaitとの併用が可能になっています。
こちらも後述するAwaitOperationにより挙動を変更することができます。

SelectAwaitとWhereAwait
private static readonly HttpClient HttpClient = new HttpClient();

private void AwaitableSample(CancellationToken ct)
{
    var subject = new R3.Subject<string>();

    subject
        .Select(x => new Uri(x))
        // 要素が存在するかの事前チェック
        // 並列度を指定して、最大2つまで同時に問い合わせる
        .WhereAwait(HeadAsync, AwaitOperation.Parallel, maxConcurrent: 2)
        // データのダウンロード
        // 並列での実行を許可しない
        .SelectAwait(DownloadAsync, AwaitOperation.Sequential)
        .Subscribe(onNext: result =>
        {
            // ダウンロード結果の表示
            Debug.Log(result.Length);
        });

    // 適当なリソースをダウンロード
    subject.OnNext("https://media.githubusercontent.com/media/TORISOUP/SequentialTaskExecutors/master/DemoResources/1.jpg");
    subject.OnNext("https://media.githubusercontent.com/media/TORISOUP/SequentialTaskExecutors/master/DemoResources/2.jpg");
    subject.OnNext("https://media.githubusercontent.com/media/TORISOUP/SequentialTaskExecutors/master/DemoResources/3.jpg");
    subject.OnCompleted();
}

// 対象のURIをHEADしてみて要素が存在するか事前チェックする
private async ValueTask<bool> HeadAsync(Uri path, CancellationToken ct)
{
    using var request = new HttpRequestMessage(new HttpMethod("HEAD"), path);
    using var response = await HttpClient.SendAsync(request, ct);
    return response.IsSuccessStatusCode;
}

// 対象のデータダウンロード
private async ValueTask<byte[]> DownloadAsync(Uri uri, CancellationToken ct)
{
    using var response = await HttpClient.GetAsync(uri, ct);
    return await response.Content.ReadAsByteArrayAsync();
}

非同期処理の挙動の調整:AwaitOperation

SubscribeAwait/WhereAwait/SelectAwaitAwaitOperationを指定することができます。
このAwaitOperationにより、非同期処理の実行中(awaitの処理が終わる前)に次のメッセージが到達してしまったときの挙動を調整することができます。

AwaitOperation await中に次のイベントが来たときの挙動 備考
Sequential 今実行中の処理を優先。余剰なイベントはキューに積む。非同期処理が終わり次第、次の1つを取り出して順番に非同期実行する。
Drop 今実行中の処理を優先。余剰なイベントは無視してなかったことにする。
Switch 今実行中の非同期処理をキャンセル。 新しく到達したイベントの処理を優先して開始する。 キャンセル処理はCancellationTokenを使って自分で実装する必要がある。
Parallel 新しく来たイベントを即座に処理する。処理が終わったものから早いもの勝ちで出力される。 maxConcurrentで同時実行数を制限できる。maxConcurrentを超える数のメッセージはキューに積まれる。
SequentialParallel※ 新しく来たイベントを即座に処理する。処理の終了順によらず、出力順が入力順と同じになるように順序調整される。 maxConcurrentで同時実行数を制限できる。maxConcurrentを超える数のメッセージはキューに積まれる。
ThrottleFirstLast 非同期処理が実行されていないとき、新しく届いた値を処理する。非同期処理の実行中は最新の値を1つだけ保持し、非同期処理の終了時にそれを取り出して処理を行う。 ThrottleFirstThrottleLastが合体した挙動

SequentialParallelはWhereAwait/SelectAwaitでのみ利用可

Debounce/ThrottleFirst/ThrottleLast

Debounce/ThrottleFirst/ThrottleLastasync/awaitに対応しています。

Unityでのボタンクリックの例
using System;
using Cysharp.Threading.Tasks;
using R3;
using UnityEngine;
using UnityEngine.UI;

namespace Scenes
{
    public class ThrottleFirstSample : MonoBehaviour
    {
        [SerializeField] private Button _button;

        private float _waitTime = 1.0f;

        private void Start()
        {
            // ボタンがクリックされたら一定時間ボタンイベントを無視する
            // 無視する期間は実行することに伸びる
            _button.OnClickAsObservable()
                .ThrottleFirst(async (_, ct) =>
                {
                    await UniTask.Delay(TimeSpan.FromSeconds(_waitTime), cancellationToken: ct);
                })
                .Subscribe(_ =>
                {
                    Debug.Log("Clicked");
                    _waitTime += 1.0f;
                });
        }
    }
}

async/awaitとObservableの相互変換

async/await => Observable

async/await(Task類)からObservableへの変換
// Task, ValueTask, UniTaskをObservableに変換する
private void TaskToObservable()
{
    // CancellationTokenを指定せずに単に変換するとき
    var observable1 = SampleAsync(default).ToObservable();

    // Observableの寿命に紐づいたCancellationTokenを取得して、それをasyncメソッドに渡すこともできる
    var observable2 = Observable.FromAsync(SampleAsync);
}

private async ValueTask<int> SampleAsync(CancellationToken token)
{
    await Task.Delay(1000, token);
    return 1;
}

Observable.Create

ファクトリメソッドであるObservable.Createでもasync/awaitを使うことができます。

Observable.Create<int>(async (observer, ct) =>
{
    await UniTask.Delay(1000, cancellationToken: ct);
    observer.OnNext(1);
    
    await UniTask.Delay(1000, cancellationToken: ct);
    observer.OnNext(2);
    
    await UniTask.Delay(1000, cancellationToken: ct);
    observer.OnNext(3);
    observer.OnCompleted();

}).Subscribe(Console.WriteLine);
1
2
3

Observable => async/await

Observableをasync/awaitで扱う
// Observableをasync/awaitで待つ
// (この辺はRxと大差なし)
private async ValueTask ObservableToTask(CancellationToken ct)
{
    // 最初の一個を待つ
    var result1 = await Observable.Return(1).FirstAsync(ct);

    // 最後の1個を待つ
    var result2 = await Observable.Range(0, 10).LastAsync(ct);
}

UniRxを使っていた人は挙動が変わっていることに注意してください。
(従来のFirstは無いので、Take(1)で代用してください)

CancellationTokenで停止できるように

各種ファクトリメソッドや、Dispose()CancellationTokenに紐付けることができるようになりました。

CancellationTokenによる停止
private void CancelSample<T>(Action<int> sampleEvent, Vector3 vector3, CancellationToken ct)
{
    // CancellationTokenでいろいろ停止可能に
    Observable.Timer(TimeSpan.FromSeconds(10), ct).Subscribe();
    Observable.FromEvent<int>(h => sampleEvent += h, h => sampleEvent -= h, ct).Subscribe();
    Observable.Range(0, 10, ct).Subscribe();
    Observable.Repeat(1, 10, ct).Subscribe();
    Observable.EveryValueChanged(vector3, v => v.y, ct).Subscribe();

    // RegisterToでIDisposableをCancellationTokenに登録できる
    Observable.Repeat(1, 10, ct)
        .Subscribe()
        .RegisterTo(ct);
}

public class Vector3
{
    public float x;
    public float y;
    public float z;
}

ReactivePropertyが同梱

ReactivePropertyと同等の機能がR3に同梱されています。
(UniRxには最初からReactivePropertyが同梱されていたので、UniRxユーザからすると変化はないです)

ただし定義(実装)は従来のものと変更されているため注意してください。
IReadOnlyReactivePropertyインタフェースは廃止され、ISubjectを直接実装する形に変更されています。
それによりReactivePropertyを読み取り専用にして公開する場合はReadOnlyReactiveProperty(クラス)にキャストして公開する必要があります。

まとめ

R3はReactiveExtensionsの足りなかった部分/使いにくかった部分を解消しています。
また現在主流のasync/awaitとの調和も考えられており、パフォーマンスも大幅に改善しています。

個人的には今からRx(またはUniRx)を導入するよりは、こちらのR3を導入したほうが良いと考えます。

既にRx(またはUniRx)が導入されているプロジェクトの場合は、R3への置き換えは単純置換では済まないため注意が必要です。
(特にOnError周りの扱いが変更されているため、Observableのエラーハンドリングをガチガチに組んでる場合は全部組み直しになります)

今後の予定

UniRxとの比較記事を書きます → 書きました

109
81
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
109
81