この記事について
次世代RxであるR3に登場するオペレーターおよびファクトリーのまとめ記事です。
R3自体はフレームワークに依存せずにピュアなC#で用いることができるライブラリです。
ですが本記事では UniRxとの比較を行うためにUnity上でR3を動作させています。
執筆環境
- Unity 6.0.30f1
- R3/R3.Unity Ver.1.3.0
- UniRx Ver.7.1.0
- UniTask Ver.2.5.10
サンプルプロジェクト
オペレーターおよびファクトリーの動作説明のために用意したサンプルプロジェクトがあります。
また一部のオペレーターの動作確認を行えるDEMOを作成してあります。
R3について
概要
R3についての解説は以前に執筆しているのでこちらを御覧ください。
概念・用語集
本記事で登場する単語について解説が必要そうなものを取り上げて紹介します。
用語 | 意味 | 補足 |
---|---|---|
Observable |
RxやR3における中核となる概念・オブジェクト。イベントメッセージを伝達し途中にオペレーターを介することでメッセージの加工ができる存在。イベントシーケンスやイベントストリームなどと呼ばれることもある。 | いわゆる「Subscribe() して使うヤツ」のこと |
Observer |
Observable に登録してメッセージを受信する側のオブジェクトのこと。 |
|
オペレーター | Operator。Observable に連結してメッセージの加工や挙動の変更を行うことができる仕組み。 |
|
ファクトリー | Factory、ファクトリーメソッド。Observable を生成するためのstaticなメソッド群のこと。 |
|
購読/購読する |
Observable にObserver を登録してメッセージの受信体制を構築すること。その処理の代表格がSubscribe() なので「Subscribeする」という言い回しを使うこともある。 |
なおForEachAsync() や~Async() などもSubscribe() と同等の処理を行っている。そのため「購読する」「Subscribeする」といった言い回しをした場合はこれらオペレーターも含んでいる。とにかく「Observable にObserver を登録する処理」を引っくるめてこう呼ぶ。 |
メッセージ | R3のObservable で扱われるOnNext /OnErrorResume /OnCompleted のこと。文脈に寄ってはOnNext 単体を指すこともある。 |
Observable を流れてくるイベントメッセージのこと。 |
OnNext |
Observable で扱われる常用のメッセージのこと。Observable<T> のT の部分。 |
|
OnErrorResume |
Observable で扱われるエラー通知用のメッセージのこと。OnErrorResume は発行されたとしてもObservable の動作は継続する。 |
UniRxではOnError の発行をもってObservable は解体されていた。 |
OnCompleted |
Observable で扱われるObservable の動作完了用のメッセージのこと。内容によって正常終了と異常終了のどちらかを表現できる。 |
中身が空なら正常終了。例外が入っていた場合は異常終了。 |
完了/完了する |
OnCompleted のうち正常終了の方のメッセージが発行されてObservable が動作を停止すること。 |
「Task が完了する」という場合もだいたい同じ意味。 |
異常終了 |
OnCompleted のうち異常終了の方のメッセージが発行されてObservable が動作を停止すること。「OnCompleted(Exception) が発行される」もこの意味と同じ。 |
R3のオペレーター一覧
R3のオペレーターを列挙して説明します。
「UniRx」の項目はUniRxに同じ動作をするオペレーターが存在する場合に記載しています。(括弧)で包まれている場合は「似てるけどちょっと違うもの」を挙げています。
また オペレーター名の列からそのオペレーターを実際に動作させているサンプルコード(テストコード)に飛ぶことができます。
変換系
OnNext
の内容を変化させるものをここにまとめています。
R3 | UniRx | 機能 | 一言でまとめると | 戻り値 |
---|---|---|---|---|
AsUnitObservable | AsUnitObservable |
Observable<Unit> に変換する。 |
型の情報を落とす | Observable<Unit> |
Cast | Cast | OnNextの型を指定した型にキャストする。OfTypeに似ているが、こちらはキャストできない場合はOnErrorResumeが発行される。 | 型変換する | Observable<TResult> |
Chunk | Buffer | 発行されたOnNextの中身を指定した個数でまとめて1つのOnNextとして発行する。まとめる必要がない場合はThrottleLastでも良い。 | まとめる | Observable<T[]> |
Chunk | Buffer | 発行されたOnNextの中身を指定した時間区間でまとめて1つのOnNextとして発行する。時間のカウントはOnNextが発行されたタイミングでスタートする。まとめる必要がない場合はThrottleLastでも良い。 | まとめる | Observable<T[]> |
Chunk | Buffer | 発行されたOnNextをまとめて、別のObservableのOnNextタイミングで1つのOnNextとして発行する。 | まとめる | Observable<T[]> |
ChunkFrame | BatchFrame | 指定したUnityのフレーム区間内に発行されたOnNextをまとめて1つのOnNextして発行する。 | まとめる | Observable<T[]> |
ChunkUntil | - | 発行されたOnNextの中身を条件を満たすまでまとめ1つのOnNextとして発行する。 | まとめる | Observable<T[]> |
FrameCount | - | OnNextが発行されたときに、それが何フレーム目に発行されたのかとセットにする。何フレーム目であるかはFrameProviderによって定まる。 | 時間を測る | Observable<(long, T)> |
FrameInterval | FrameInterval | OnNextが発行されたときに、1つ前のOnNext発行から何フレーム経過しているかとセットにする。フレーム数のカウントはFrameProviderによって定まる。 | 時間を測る | Observable<(long, T)> |
Index | (Select) | OnNextメッセージにIndexを振る。 | 採番する | Observable<(int, T)> |
Pairwise | Pairwise | 現在のOnNextの値と、その1つ前のOnNextの値をペアにする。 | 直前の値と最新の値を取り出す | Observable<(T, T)> |
Scan | Scan | OnNextが発行されるたびに畳み込み(fold)計算を行いその結果をOnNextとして発行する。AggregateAsyncに似ているが、こちらは型がObservable<T> である。 |
畳み込む | Observable<T> |
Select | Select | OnNextの値を用いて処理を実行しその結果に内容を差し替える。 | 変換する | Observable<T> |
SelectAwait | - | OnNextの値を用いて非同期処理を実行しその結果待って新しいOnNextとして発行する。指定するAwaitOperationで挙動が変化する。 | 変換する | Observable<T> |
SelectMany | SelectMany |
Observable<T> から新たなObservable<TResult> を生成し、それを並列に合成する。 |
変換する、flatMap | Observable<TResult> |
TimeInterval | TimeInterval/FrameTimeInterval | 直前のOnNextと今のOnNextの時間間隔を計測し、結果をまとめてOnNextとして発行する。 | 時間を測る | Observable<(TimeSpan, T)> |
Timestamp | (Tmestamp) | OnNextにタイムスタンプを付与する。タイムスタンプはTimeProvider.GetTimestapmの値である。 | 時間を測る | Observable<(long, T)> |
フィルタ系
メッセージのフィルタリングを行うものをここにまとめています。
R3 | UniRx | 機能 | 一言でまとめると | 戻り値 |
---|---|---|---|---|
Debounce | Throttle | メッセージの流量を減らす。OnNextがまとめて発行されたときに、最後に値が発行されてから一定時間経過後に最後のOnNextを1つだけ発行する。 | 流量を絞る | Observable<T> |
Debounce | - | メッセージの流量を減らす。OnNextがまとめて発行されたときに、非同期処理を実行し完了時に最後のOnNextを1つだけ発行する。 | 流量を絞る | Observable<T> |
DebounceFrame | ThrottleFrame | メッセージの流量を減らす。OnNextがまとめて発行されたときに、最後に値が発行されてから一定フレーム経過後に最後のOnNextを1つだけ発行する。 | 流量を絞る | Observable<T> |
Distinct | Distinct | OnNextから重複した値のメッセージを除外する。 | 重複削除 | Observable<T> |
DistinctBy | Distinct | OnNextの値を用いて加工を行い、その結果として重複したメッセージを除外する。 | 重複削除 | Observable<T> |
DistinctUntilChanged | DistinctUntilChanged | OnNextから連続で重複した値のメッセージを除外する。 | 連続した重複を削除 | Observable<T> |
DistinctUntilChangedBy | DistinctUntilChanged | OnNextの値を用いて加工を行い、その結果として連続で重複したメッセージを除外する。 | 連続した重複を削除 | Observable<T> |
IgnoreElements | IgnoreElements | OnNextを無視する。 | 無視する | Observable<T> |
IgnoreOnErrorResume | (CatchIgnore) | OnErrorResumeを無視する。 | 無視する | Observable<T> |
OfType | OfType | OnNextの型を指定した型にキャストする。Castと似ているが、こちらは型変換ができない場合は無視される。 | 型変換する | Observable<TResult> |
Skip | Skip | 購読開始から指定した個数のOnNextを無視する。 | OnNextを無視する | Observable<T> |
Skip | Skip | 購読開始から指定した時間OnNextを無視する。 | OnNextを無視する | Observable<T> |
SkipFrame | - | 購読開始から指定したフレーム間のOnNextを無視する。 | OnNextを無視する | Observable<T> |
SkipLast | - | Observable完了時に最後から指定した時間のOnNextを無視する。 | OnNextを無視する | Observable<T> |
SkipLastFrame | - | Observable完了時に最後から指定したフレーム間のOnNextを無視する。 | OnNextを無視する | Observable<T> |
SkipUntil | - | 非同期処理が完了するまでの間はOnNextメッセージを無視し続ける。 | OnNextを無視する | Observable<T> |
SkipUntil | - | CancellationTokenがキャンセルされるまでの間はOnNextメッセージを無視し続ける。 | OnNextを無視する | Observable<T> |
SkipUntil | SkipUntil | 他のObservableのOnNextが到達するまでの間はOnNextメッセージを無視し続ける。 | OnNextを無視する | Observable<T> |
SkipWhile | SkipWhile | OnNextメッセージが条件を満たす間はOnNextを無視し続ける。一度条件を満たさなくなったらそれ以降のOnNextはすべて通過させる。 | OnNextを無視する | Observable<T> |
Take | Take | 購読開始から指定した個数のOnNextを通過させ、指定数を超えたらOnCompletedを発行する。 | 最初の方の値を取り出す | Observable<T> |
Take | Take | 購読開始から指定した時間OnNextを通過させ、その時間を超えたらOnCompletedを発行する。 | 最初の方の値を取り出す | Observable<T> |
TakeFrame | - | 購読開始から指定したフレーム間OnNextを通過させ、そのフレーム数を超えたらOnCompletedを発行する。 | 最初の方の値を取り出す | Observable<T> |
TakeLast | TakeLast | OnCompletedが発行されたときに最後から指定した個数のOnNextのみを取り出して発行する。 | 最後の方の値を取り出す | Observable<T> |
TakeLast | TakeLast | OnCompletedが発行されたときに最後から指定した時間以内のOnNextのみを取り出して発行する。 | 最後の方の値を取り出す | Observable<T> |
TakeLastFrame | - | OnCompletedが発行されたときに最後から指定したフレーム数以内のOnNextのみを取り出して発行する。 | 最後の方の値を取り出す | Observable<T> |
ThrottleFirst | ThrottleFirst | OnNextが発行されたらまずそれを通過させ、その次以降のOnNextを遮断する。遮断状態は一定時間経過で解除される。 | 流量を絞る | Observable<T> |
ThrottleFirst | - | OnNextが発行されたらまずそれを通過させ、その次以降のOnNextを遮断する。遮断状態は別のObservableからOnNextが発行されたら解除される。 | 流量を絞る | Observable<T> |
ThrottleFirst | - | OnNextが発行されたらまずそれを通過させ、非同期処理を実行する。その非同期処理が完了するまでの間OnNextを遮断する。 | 流量を絞る | Observable<T> |
ThrottleFirstFrame | ThrottleFirstFrame | OnNextが発行されたら次以降のOnNextを遮断する。遮断状態は一定フレーム経過で解除される。 | 流量を絞る | Observable<T> |
ThrottleFirstLast | - | OnNextが発行されたらまずそれを通過させ、一定時間の間OnNextを遮断する。遮断期間中に発行されたOnNextがあった場合は、遮断期間終了時にその最後のものを1つ発行する。 | 流量を絞る | Observable<T> |
ThrottleFirstLast | - | OnNextが発行されたらまずそれを通過させ、、それ以降のOnNextを遮断する。指定した別のObservableからOnNextが発行されると遮断は解除される。遮断中に発行されたOnNextがあった場合は、遮断期間終了時にその最後のものを1つ発行する。 | 流量を絞る | Observable<T> |
ThrottleFirstLast | - | OnNextが発行されたらまずそれを通過させ、、非同期処理を実行しその実行中はそれ以降のOnNextを遮断する。遮断期間中に発行されたOnNextがあった場合は、遮断期間終了時にその最後のものを1つ発行する。 | 流量を絞る | Observable<T> |
ThrottleFirstLastFrame | - | OnNextが発行されたら次以降のOnNextを遮断する。遮断状態は一定フレーム経過で解除される。遮断期間中に発行されたOnNextがあった場合は、遮断期間終了時にその最後のものを1つ発行する。 | 流量を絞る | Observable<T> |
ThrottleLast | Sample | OnNextが発行されたらそれを含め一旦すべてのOnNextを遮断する。遮断状態は一定時間経過で解除され、解除時に一番最後に発行されていたOnNextを1つ発行する。Chunkに似ているがこちらは値をまとめることはない。 | 流量を絞る | Observable<T> |
ThrottleLast | Sample | OnNextが発行されたらそれを含め一旦すべてのOnNextを遮断する。別のObservableを指定しそのObservableからOnNextが発行されると遮断状態は解除される。解除時に一番最後に発行されていたOnNextを1つ発行する。Chunkに似ているがこちらは値をまとめることはない。 | 流量を絞る | Observable<T> |
ThrottleLast | - | OnNextが発行されたらそれを含め一旦すべてのOnNextを遮断し、非同期処理を実行する。その非同期処理が完了すると遮断状態は解除される。解除時に一番最後に発行されていたOnNextを1つ発行する。 | 流量を絞る | Observable<T> |
ThrottleLastFrame | SampleFrame | OnNextが発行されたらそれを含め一旦すべてのOnNextを遮断する。遮断状態は一定フレーム経過で解除され、解除時に一番最後に発行されていたOnNextを1つ発行する。ChunkFrameに似ているがこちらは値をまとめることはない。 | 流量を絞る | Observable<T> |
Where | Where | OnNextの値を条件でフィルタリングする。 | フィルタリングする | Observable<T> |
WhereAwait | - | OnNextの値を条件でフィルタリングする。その際の条件判定に非同期処理を用いることができる。指定するAwaitOperationで挙動が変化する。 | フィルタリングする | Observable<T> |
WhereNotNull | - | Observableに対し、値がnullでない場合のみにフィルタリングする。 | フィルタリングする | Observable<T> |
時間や実行コンテキストの制御系
メッセージの発行タイミングや、実行コンテキスト(スレッド)などを変化させるものをここにまとめています。
R3 | UniRx | 機能 | 一言でまとめると | 戻り値 |
---|---|---|---|---|
Delay | Delay | 発行されたすべての種別のメッセージを指定時間だけ遅延させる。 | 遅延させる | Observable<T> |
DelayFrame | DelayFrame | 発行されたすべての種別のメッセージを指定フレームだけ遅延させる。 | 遅延させる | Observable<T> |
DelaySubscription | DelaySubscription | Subscribeの実行を指定時間だけ遅延させる。 | 遅延して購読する | Observable<T> |
DelaySubscriptionFrame | DelayFrameSubscription | Subscribeの実行を指定フレームだけ遅延させる。 | 遅延して購読する | Observable<T> |
ObserveOn | ObserveOn | Observableの実行コンテキストを別のコンテキストに変更する。実行スレッドを変更したり、別のタイミングに移したりできる。 | 実行コンテキストを変える | Observable<T> |
ObserveOnCurrentSynchronizationContext | - | Observableの実行コンテキストをSynchronizationContextを用いて変更する。SynchronizationContextはObserveOnCurrentSynchronizationContextを呼び出したタイミングにおけるSynchronizationContext.Currentが用いられる。SynchronizationContext.Currentがnullの場合はスレッドプールで代用される。 | 実行コンテキストを変える | Observable<T> |
ObserveOnMainThread | ObserveOnMainThread | Unity専用。Observableの実行コンテキストをUnityメインスレッドに変更する。 | 実行コンテキストを変える | Observable<T> |
ObserveOnThreadPool | ObserveOn | Observableの実行コンテキストをThreadPoolに変更する。 | 実行コンテキストを変える | Observable<T> |
SubscribeOn | SubscribeOn | 「Subscribe()の内部処理」の実行コンテキストを指定できる。「Subscribe()の内部処理」とはObservableにObserverを登録し各Operatorが初期化される一連の処理のことを指す。発行されるメッセージの実行コンテキストを切り替えたいときはObserveOnを使うべき。 | 購読のコンテキストを変える | Observable<T> |
SubscribeOnCurrentSynchronizationContext | - | 「Subscribe()の内部処理」を現在の実行コンテキスト上のSynchronizationContextを指定して行う。「Subscribe()の内部処理」とはObservableにObserverを登録し各Operatorが初期化される一連の処理のことを指す。 | 購読のコンテキストを変える | Observable<T> |
SubscribeOnMainThread | 「Subscribe()の内部処理」をメインスレッド上で行う。「Subscribe()の内部処理」とはObservableにObserverを登録し各Operatorが初期化される一連の処理のことを指す。 | 購読のコンテキストを変える | Observable<T> |
|
SubscribeOnSynchronize | - | 「Subscribe()の内部処理」の実行に排他ロックをかける。「Subscribe()の内部処理」とはObservableにObserverを登録し各Operatorが初期化される一連の処理のことを指す。 | 購読のコンテキストを変える | Observable<T> |
SubscribeOnThreadPool | 「Subscribe()の内部処理」をスレッドプール上で行う。「Subscribe()の内部処理」とはObservableにObserverを登録し各Operatorが初期化される一連の処理のことを指す。 | 購読のコンテキストを変える | Observable<T> |
Observableの性質変化系
Observable
そのものの挙動を変化させるものをここにまとめています。
R3 | UniRx | 機能 | 一言でまとめると | 戻り値 |
---|---|---|---|---|
Append | - | OnCompleted発行時に、Observableの最後に指定された値を挿入する。 | 最後に足す | Observable<T> |
DefaultIfEmpty | DefaultIfEmpty | OnNextが一度も発行されずにObservableが完了したときに、値を1つだけ発行する。 | 空のときの処理を足す | Observable<T> |
OnErrorResumeAsFailure | - | OnErrorResumeをOnCompleted(Exception)に変換する。 | 異常終了にする | Observable<T> |
Prepend | StartWith | Observableの購読直後に指定した値を挿入する。 | 最初に足す | Observable<T> |
TakeUntil | TakeUntil | 指定した別のObservableのOnNextが到達した場合にOnCompletedを発行し本流の購読を打ち切る。 | 購読を止める条件を追加する | Observable<T> |
TakeUntil | TakeUntil | OnNextの値が条件を満たしたときにOnCompletedを発行して購読を打ち切る。 | 購読を止める条件を追加する | Observable<T> |
TakeUntil | - | CancellationTokenがキャンセルされたときにOnCompletedを発行して購読を打ち切る。 | 購読を止める条件を追加する | Observable<T> |
TakeUntil | - | 非同期処理の完了を待機し、その完了時にOnCompletedを発行して購読を打ち切る。 | 購読を止める条件を追加する | Observable<T> |
TakeWhile | TakeWhile | OnNextが条件を満たし続ける間、OnNextを通過させ続ける。条件を満たさないOnNextが発行されるとOnCompletedを発行し購読を打ち切る。 | 購読を止める条件を追加する | Observable<T> |
Timeout | (Timeout) | OnNextの発行間隔が指定したTimeSpan以上開いたときにOnCompleted(TimeoutException)を発行してObservableを異常終了させる。 | タイムアウト条件を足す | Observable<T> |
TimeoutFrame | (TimeoutFrame) | OnNextの発行間隔が指定したフレーム間隔以上開いたときにOnCompleted(TimeoutException)を発行してObservableを異常終了させる。 | タイムアウト条件を足す | Observable<T> |
Trampoline | ObserveOn(Scheduler.CurrentThread) | Observableのメッセージが再帰した際に末尾再帰に変換する。 | 末尾再帰にする | Observable<T> |
Observableの合成系
複数のObservable
を用いて1つのObservable
を構築するものをここにまとめています。
R3 | UniRx | 機能 | 一言でまとめると | 戻り値 |
---|---|---|---|---|
Amb | Amb | 2つのObservableを同時に購読し、OnNextが先着した方のObservableのみを採択する。 | 先着したものを使う | Observable<T> |
Catch | Catch | 購読中のObservableからOnCompleted(Exception)が発行された場合、指定したObservableに購読先を切り替える。 | 異常終了の対応 | Observable<T> |
CombineLatest | CombineLatest | 複数のObservableの入力のうち、それぞれ最新のものをセットにして1つのOnNextとして発行する。 | 複数のObservableをまとめる | Observable<TResult> |
Concat | Concat | OnCompleted発行時に次のObservableに購読先を切り替える(Observableを直列に合成する)。 | 直列に連結する | Observable<T> |
Merge | Merge | 2つのObservableのメッセージを混ぜて1つのObservableとして扱う(Observableを並列に合成する)。 | 並列合成 | Observable<T> |
Switch | Switch | Observable<`Observable`>に対してのみ利用可能。新しく発行された`Observable`に対して購読を次々に乗り換えることができる。 | 購読先を切り替える | Observable<T> |
WithLatestFrom | WithLatestFrom | メインとなるObservableにOnNextが入力されたタイミングで別のObservableの最新値と合成して出力する。サブObservable側に一度もOnNextが発行されていない場合は何も出力されない。 | 2つのObservableを合成する | Observable<TResult> |
Zip | Zip | 複数のObservableの入力をキューにつめて、揃ったものから順次1つのOnNextとして発行する。 | 複数のObservableをまとめる | Observable<TResult> |
ZipLatest | ZipLatest | 複数のObservableの入力のうち最新のものを1つだけ保持し、揃ったものから順次1つのOnNextとして発行する。CombineLatestとの違いはこちらは一度発行した値は再利用されない。 | 複数のObservableをまとめる | Observable<TResult> |
Hot変換系
Hot変換目的のものをここにまとめています。Hot変換については過去に記事としてまとめているのでこちらをご参照ください。
R3 | UniRx | 機能 | 一言でまとめると | 戻り値 |
---|---|---|---|---|
Multicast | Multicast |
Observable<T> を購読しその結果をすべて別のISubjectに流し込む。 |
他のSubjectに連結する | ConnectableObservable<T> |
Publish | Publish | Obsevableを購読しその結果をすべて後続に流し直す。Observableをあらかじめ購読しておく(Hot変換)ために用いる。Multicast(new Subject())またMulticast(new BehaviorSubject(initValue))と同義。 | Hot変換する | ConnectableObservable<T> |
RefCount | RefCount |
ConnectableObservable<T> のConnect()とそのDispose()を自動化する。購読者が1つ以上いる場合は自動的にConnect()し、購読者が0になったら自動的にDispose()する。Dispose()後もまた新たに購読者がきた場合は再度Connect()する。 |
Publish()を自動化する | Observable<T> |
Replay | Replay | Obsevableを購読しその結果をすべて後続に流し直す。Observableをあらかじめ購読しておく(Hot変換)ために用いる。Multicast(new ReplaySubject())と同義。 | Hot変換する | ConnectableObservable<T> |
ReplayFrame | - | Obsevableを購読しその結果をすべて後続に流し直す。Observableをあらかじめ購読しておく(Hot変換)ために用いる。Multicast(new ReplayFrameSubject())と同義。 | Hot変換する | ConnectableObservable<T> |
Share | Share | Publish().RefCount()と同義。 | Publish().RefCount()と同じ | Observable<T> |
Task変換系
Observable
をTask
に変換して扱うことができるものをここにまとめています。
R3 | UniRx | 機能 | 一言でまとめると | 戻り値 |
---|---|---|---|---|
AggregateAsync | (Aggregate) | OnCompleted発行時に、過去に発行されたOnNextを畳み込み(fold)計算する。Scanに似ているが、こちらはTask<T> である。 |
畳み込む | Task<T> |
AggregateByAsync | - | OnCompleted発行時に、過去に発行されたOnNextをkeySelectorでグループ化し、グループ単位で畳み込み(fold)計算する。 | 畳み込む | Task<T> |
AllAsync | - | OnCompleted発行時に、過去に発行されたすべてのOnNextが条件を満たしているかを調べる。 | すべて合ってるか | Task<bool> |
AnyAsync | - | 条件を満たしたOnNextが発行されたら即座に完了するTask<bool> に変換する。ContainsAsyncとの違いはFuncを引数に取る。 |
どれか一個でも合ってるか | Task<bool> |
AverageAsync | - | OnCompleted発行時に、購読中に発行されたOnNextの平均値を求める。 | 平均する | Task<double> |
ContainsAsync | - | 指定した値を含んだOnNextが発行されたら即座に完了するTask<bool> に変換する。AnyAsyncとの違いはこちらは値そのものを引数に取る。 |
含むか調べる | Task<bool> |
CountAsync | - | OnCompleted発行時に、購読中に発行された条件を満たす値の個数を返す。 | 数える | Task<int> |
ElementAtAsync | - | 指定した個数番目(ゼロオリジン)のOnNextの値を返すTask<T> に変換する。指定した個数に達せずにObservableが完了したときはArgumentOutOfRangeExceptionとなる。 |
要素を取り出す | Task<T> |
ElementAtOrDefaultAsync | - | 指定した個数番目(ゼロオリジン)のOnNextの値を返すTask<T> に変換する。指定した個数に達せずにObservableが完了したときは既定値となる。 |
要素を取り出す | Task<T> |
FirstAsync | (First) | 条件を満たしたOnNextが発行されたら即座に完了するTask<T> に変換する。条件を満たすOnNextが発行されずにObservableが完了したときはInvalidOperationExceptionとなる。 |
最初のメッセージを待つ | Task<T> |
FirstOrDefaultAsync | (FirstOrDefault) | 条件を満たしたOnNextが発行されたら即座に完了するTask<T> に変換する。条件を満たすOnNextが発行されずにObservableが完了したときは既定値となる。 |
最初のメッセージを待つ | Task<T> |
IsEmptyAsync | - | OnNextが一度も発行されずにObservableが完了したかどうかを判定するTask<bool> に変換する。 |
空か調べる | Task<bool> |
LastAsync | (Last) | 一番最後に条件を満たしたOnNextを返すTask<T> に変換する。条件を満たすOnNextが発行されずにObservableが完了したときはInvalidOperationExceptionとなる。 |
最後のメッセージを待つ | Task<T> |
LastOrDefaultAsync | (LastOrDefault) | 一番最後に条件を満たしたOnNextを返すTask<T> に変換する。条件を満たすOnNextが発行されずにObservableが完了したときは既定値となる。 |
最初のメッセージを待つ | Task<T> |
LongCountAsync | - | OnCompleted発行時に、購読中に発行された条件を満たす値の個数を返す。CountAsyncとの違いは型がlongになっている。 | 数える | Task<long> |
MaxAsync | - |
Observable<T> の完了時に指定した条件下での最大値(TResult型)を返すTaskに変換する。MaxByAsyncとの違いはこちらは「結果となる最大値そのもの」を返す。 |
最大値を調べる | Task<TResult> |
MaxByAsync | - |
Observable<T> の完了時に指定した条件下での最大値を含んだT型を返すTaskに変換する。MaxAsyncとの違いはこちらは「最大値を含んだもとのOnNextの値」を返す。 |
最大値を含むものを調べる | Task<T> |
MinAsync | - |
Observable<T> の完了時に指定した条件下での最小値(TResult型)を返すTaskに変換する。MinByAsyncとの違いはこちらは「結果となる最大値そのもの」を返す。 |
最小値を調べる | Task<TResult> |
MinByAsync | - |
Observable<T> の完了時に指定した条件下での最小値を含んだT型を返すTaskに変換する。MinAsyncとの違いはこちらは「最小値を含んだもとのOnNextの値」を返す。 |
最小値を含むものを調べる | Task<T> |
MinMaxAsync | - |
Observable<T> の完了時に指定した条件下での最小値と最大値(TResult型)を返すTaskに変換する。 |
最小値と最大値を調べる | Task<(TResult, TResult)> |
SequenceEqualAsync | - | 2つのObservableが発行する値が順序含め完全に一致するかを表すTask<bool> を返す。一致しないと判断された場合は直ちにfalseで終了するが、完全に一致するかどうかの判定を行う(trueを返す)ためには両者のObservableが完了する必要がある。 |
一致するか調べる | Task<bool> |
SingleAsync | (Single) | 購読開始から完了までの間に、条件を満たしたOnNextが1つだけであるかを調べその結果を返すTask<T> に変換する。条件を満たすOnNextが発行されなかった、または2つ以上発行された場合はInvalidOperationExceptionとなる。 |
1個だけか調べる | Task<T> |
SingleOrDefaultAsync | (SingleOrDefault) | 購読開始から完了までの間に、条件を満たしたOnNextが1つだけであるかを調べその結果を返すTask<T> に変換する。条件を満たすOnNextが発行されなかった場合は既定値が発行される。2つ以上発行された場合はInvalidOperationExceptionとなる。 |
1個だけか調べる | Task<T> |
SumAsync | - | OnNextの合算値を返すTask<T> に変換する。このTaskはOnCompleted発行時に完了する。 |
合計する | Task<T> |
ToArrayAsync | (ToArray) | Observableの完了時に購読開始から発行されたOnNextを配列にまとめて返すTaskに変換する。 | 配列に変換する | Task<T[]> |
ToDictionaryAsync | - | Observableの完了時に購読開始から発行されたOnNextをDictionaryにまとめて返すTask<Dictionary<TKey, TElement>> に変換する。 |
辞書に変換する | Task<Dictionary<TKey, TElement>> |
ToHashSetAsync | - | Observableの完了時に購読開始から発行されたOnNextをHashSetにまとめて返すTask>に変換する。 |
HashSet<T> に変換する |
Task<HashSet<T>> |
ToListAsync | - | Observableの完了時に購読開始から発行されたOnNextをListにまとめて返すTask>に変換する。 |
List<T> に変換する |
Task<List<T>> |
ToLookupAsync | - | Observableの完了時に購読開始から発行されたOnNextをLookUpにまとめて返すTask>に変換する。 |
ILookUp<TKey, T> に変換する |
Task<ILookup<TKey, T>> |
WaitAsync | (Wait), (ToTask) | Observableが完了するのを待機するTaskに変換する。 | 待つ | Task |
その他
上記の分類に当てはまらないもの、または特殊なものをここにまとめています。
R3 | UniRx | 機能 | 一言でまとめると | 戻り値 |
---|---|---|---|---|
ForEachAsync | (ForEachAsync) | Observableを購読しつつそれが完了まで待機するTaskに変換する。要するに完了までawaitができるSubscribe()。 | Subscribe()をawatiできるようにする | Task |
AsObservable | AsObservable | Observable型に変換する。ソースとなっている型(SubjectやReactivePropertyなど)へのダウンャストを防止できる。 | Observableに変換する | Observable<T> |
AsSystemObservable | - | R3.Observable<T> からSystem.IObservable<T> へと変換する。 |
Rx/UniRxに繋ぎこむ | System.IObservable<T> |
Dematerialize | Dematerialize | Materializeを解除する。 | Materializeを解除する | Observable<T> |
Do | Do/DoOnError/DoOnCompleted/DoOnSubscribe | ObservableのOnNext/OnErrorResume/OnCompleted/Subscribeのタイミングで任意のデリゲートを実行する。 | 副作用を起こす | Observable<T> |
DoCancelOnCompleted | - | OnCompleted時にCancellationTokenSourceをキャンセルする。 | CancellationTokenSourceと連動させる | Observable<T> |
Materialize | Materialize | OnNext/OnErrorResume/OnCompletedをすべてOnNext型に変換する。Materializeを適用したObservableはDematerializeを適用することでもとに戻る。 | すべてをOnNextに変換する。 | Observable<Notification<T>> |
SubscribeAwait | - | Subscribe()のメッセージハンドリング処理のasync/await対応版。指定するAwaitOperationで挙動が変化する。 | 非同期処理を使う | IDisopsable |
Synchronize | Synchronize | それ以降のOnNext処理に排他ロックをかける。ただしSelectAwaitやWhereAwaitなどの非同期処理が使えるOperatorを挟んだ場合は排他ロックが外れるため注意。 | 排他ロックする | Observable<T> |
ToAsyncEnumerable | - |
Observable<T> をIAsyncEumerable<T> に変換する。 |
IAsyncEnumerable<T> に変換する |
IAsyncEnumerable<T> |
ToLiveList | - | 発行されたOnNextをリアルタイムに格納するコレクションLiveList<T> に変換する。 |
リアルタイムに変動するコレクションに変換する | LiveList<T> |
R3のファクトリー一覧
R3のファクトリーメソッドを列挙して紹介します。
メソッド | 機能 | 戻り値 |
---|---|---|
Amb | 複数のObservableを同時に購読し、OnNextが先着した方のObservableのみを採択する。 | Observable<T> |
CombineLatest | 複数のObservableの入力のうち、それぞれ最新のものをセットにして1つのOnNextとして発行する。 | Observable<T> |
Concat | OnCompleted発行時に次のObservableに購読先を切り替える(Observableを直列に合成する)。 | Observable<T> |
Create | 手続き処理を用いて任意のタイミングでメッセージを発行するObservableを作成する。 | Observable<T> |
CreateFrom | IAsyncEnumerableをObservable<T> に変換する。 |
Observable<T> |
Defer | Observableの評価タイミングをSubscribe時まで遅延させる。 | Observable<T> |
Empty | 購読直後にOnCompleted()を発行する。 | Observable<T> |
EveryUpdate | 毎フレームOnNext(Unit)を発行する。フレームタイミングは指定可能。 | Observable<Unit> |
EveryValueChanged | 指定オブジェクトを毎フレームチェックし、差分があれば値を取り出して発行する。 | Observable<TProperty> |
FromAsync | 非同期処理(ValueTask)をObservableに変換する。 | Observable<T> |
FromEvent | デリゲートをObservableに変換する。 |
Observable<Unit> / Observable<T>
|
FromEventHandler | EventHandlerを用いたデリゲートをObservableに変換する。 | Observable<(Object, TEventArgs)> |
Interval | 一定間隔でOnNextを発行する。 | Observable<Unit> |
IntervalFrame | 一定フレーム間隔でOnNextを発行する。 | Observable<Unit> |
Merge | Observableを並列に合成する。 | Observable<T> |
Never | 何も発行しないObservableを生成する。 | Observable<Unit> |
NextFrame | 必ず次のフレームで1回だけOnNext(Unit)を発行し、その直後にOnCompletedを発行する。Observable.YeildFrame()と似ているが、こちらは必ず次のフレームで発行されることが保障される。 | Observable<Unit> |
ObservePropertyChanged | INotifyPropertyChangedをObservableに変換する。 | Observable<TProperty> |
ObservePropertyChanging | INotifyPropertyChangingをObservableに変換する。 | Observable<TProperty> |
Range | 指定した数値から連番で指定個数OnNext(int)を発行し、最後にOnCompletedを発行する。 | Observable<int> |
Repeat | 指定した値を指定した回数繰り返し発行し、最後にOnCompletedを発行する。 | Observable<T> |
Return | 指定した値を1回だけ発行し、OnCompletedを発行する。発行タイミングは調整可能。 | Observable<T> |
ReturnFrame | 指定した値を1回だけ発行し、OnCompletedを発行する。発行するフレームは調整可能。 | Observable<T> |
ReturnOnCompleted | OnCompletedを発行する。Resultの成否は指定可能。発行タイミングは調整可能。 | Observable<T> |
ReturnUnit | OnNext(Unit)を発行し、OnCompletedを発行する。 | Observable<Unit> |
Throw | 異常終了のOnCompletedを発行する。Observable.ReturnOnCompleted(Result.Failure(exception))と同義。 | Observable<T> |
Timer | 指定時間後にOnNext(Unit)を1回発行し、OnCompletedを発行する。 | Observable<Unit> |
Timer | 指定時間後にOnNext(Unit)を1回発行し、その後は別途指定した時間間隔ごとにOnNext(Unit)を発行し続ける。 | Observable<Unit> |
TimerFrame | 指定フレーム後にOnNext(Unit)を1回発行し、OnCompletedを発行する。 | Observable<Unit> |
TimerFrame | 指定フレーム後にOnNext(Unit)を1回発行し、その後は別途指定したフレーム間隔ごとにOnNext(Unit)を発行し続ける。 | Observable<Unit> |
ToObservable | Task/ValueTask/IEnumerable/IAsyncEnumerable/System.IObservableをR3.Observableに変換する。Rx/UniRxからR3に接続する時にもこれを使う。 | Observable<T> |
Yield | 指定のTimeProviderの次の実行タイミングにメッセージを発行する。TimeProvider未指定の場合はThreadPoolからメッセージ発行される。 | Observable<T> |
YieldFrame | 指定のFrameProviderの次の実行タイミングにメッセージを発行する。Observable.NextFrame()に似ているが、こちらはタイミングによっては同一フレーム内でメッセージが発行されることがある。 | Observable<T> |
Zip | 複数のObservableの入力をキューにつめて、揃ったものから順次1つのOnNextとして発行する。 | Observable<TResult> |
ZipLatest | 複数のObservableの入力のうち最新のものを1つだけ保持し、揃ったものから順次1つのOnNextとして発行する。CombineLatestとの違いはこちらは一度発行した値は再利用されない。 | Observable<TResult> |
R3とUniRxの違い
名称が変更されたオペレーター
R3のREADMEにまとまっていますが、次のオペレーターは名称が変更されています。
-
Buffer
->Chunk
-
BatchFrame
->ChunkFrame
-
Throttle
->Debounce
-
ThrottleFrame
->DebounceFrame
-
Sample
->ThrottleLast
-
SampleFrame
->ThrottleLastFrame
-
StartWith
->Prepend
-
ObserveEveryValueChanged(this T value)
->Observable.EveryValueChanged(T value)
-
Distinct(selector)
->DistinctBy
-
DistinctUntilChanged(selector)
->DistinctUntilChangedBy
-
Finally
->Do(onDisposed:)
-
Do***
->Do(on***:)
-
AsyncSubject<T>
->TaskCompletionSource<T>
-
First()
やFirstOrDefault()
など ->***Async
(またはTake(1)
,TakeLast(1)
) -
ToTask()
,ToUniTask()
->LastAsync()
またはFirstAsync()
一番大きな違いとしては、単発で値を取得する場合は~Async()
といったTask
を返すオペレーターを用いてawait
で待ちけるようになったところです。
UniRxに存在したがR3で消滅したオペレーター等
(First
とFirstAsync
などの似た挙動をするものが存在するものは除外。本当に跡形もなくなったもの。)
名前 | R3での代替方法 | 備考 |
---|---|---|
AsSingleUnitObservable | WaitAsync() |
挙動としてはAsUnitObservable().LastAsync() の方が近いが、想定用途としてはWaitAsync() の方が近い。 |
ContinueWith |
~Async() とasync/await を併用する。 |
Observable を用いた非同期処理用のオペレーター。現代ではasync/await があるのでそっちを使うことを推奨。 |
GroupBy | - | 地味に便利だったがR3では存在しない。 |
Repeat | - | R3ではObservable の再利用を推奨しないため存在しない。 |
RepeatUntil | - | R3ではObservable の再利用を推奨しないため存在しない。 |
RepeatSafe | - | R3ではObservable の再利用を推奨しないため存在しない。 |
Catch(OnError のハンドリング用途) |
- |
OnError がOnErrorResume に置き換わったため存在しない。 |
CatchIgnore | - |
OnError がOnErrorResume に置き換わったため存在しない。 |
Reatry | - |
OnError がOnErrorResume に置き換わったため存在しない。 |
OnErrorRetry | - |
OnError がOnErrorResume に置き換わったため存在しない。 |
WhenAll |
LastAsync() したものをTask.WhenAll で待ち受ける。 |
Observable を用いた非同期処理用のオペレーター。現代ではasync/await があるのでそっちを使うことを推奨。 |
Debug |
Do を用いて再現する。 |
Observable 上で起きた事象をログに吐き出すオペレーター。再現方法はこちらの記事にて紹介。
|
R3ではUniRxと設計思想が異なるため、Repeat
やRetry
といったオペレーターが消滅しています。
そのためもしこれらを用いたUniRxコードをR3に置き換える場合は結構な改修が必要になります。
これらオペレーターはOnErrorResumeAsFailure()
やForEachAsync()
をうまく使うことで挙動を再現することができるので、参考にしてください。
R3 挙動がややこしいオペレーター等の解説
-
Debounce
,ThrottleFirst
,ThrottleLast
,ThrottleFirstLast
AwaitOperation
-
Zip
,ZipLatest
,CombineLatest
,WithLatestFrom
これらオペレーターやプロパティは挙動が似ており非常にややこしいです。なのでわかりやすく挙動を確認できるページを用意したので活用してください。
Debounce系の挙動の違い
名前 | 挙動 |
---|---|
Debounce | OnNextが連続した場合は落ち着くのを待って最後の1個を取り出す。 |
ThrottleFirst | OnNextが連続した場合の最初の1個を取り出す。 |
ThrottleLast | OnNextが連続した場合の最後の1個を取り出す。 |
ThrottleFirstLast | OnNextが連続した場合の最初と最後の1個ずつを取り出す。 |
以下比較用のgif動画。
Debounce
OnNextが連続した場合は落ち着くのを待って最後の1個を取り出す。
OnNextが来るたびにその待ち時間はリセットされる。
(今回の実装では待ち時間は毎回ランダムになるように設定)
ThrottleFirst
OnNextが連続した場合は最初の1個を取り出す。一度発動すると一定時間はOnNextを遮断する。
待ち時間はOnNextが来てもリセットされない。
(今回の実装では待ち時間は毎回ランダムになるように設定)
ThrottleLast
OnNextが連続した場合は最後の1個を取り出す。待機中にOnNextが到達したら「待ち」を開始しその間に発行されたOnNextのうちの最後を取り出す。
待ち時間はOnNextが来てもリセットされない。
(今回の実装では待ち時間は毎回ランダムになるように設定)
ThrottleFirstLast
ThrottleFirst
とThrottleLast
が合体した挙動をする。
待ち時間はOnNextが来てもリセットされない。
(今回の実装では待ち時間は毎回ランダムになるように設定)
AwaitOperationの挙動の違い
SubscribeAwait
/WhereAwait
/SelectAwait
は指定するAwaitOperation
で挙動が大きく変化します。
AwaitOperation | 非同期の実行中に次のメッセージが来たときの挙動 | 備考 |
---|---|---|
Sequential | 今実行中の非同期処理を優先。余剰なメッセージはキューに積む。非同期処理が終わり次第、次の1つを取り出して順番に非同期実行する。 | デフォルト設定がこれ |
Drop | 今実行中の非同期処理を優先。余剰なメッセージは無視してなかったことにする。 | |
Switch | 今実行中の非同期処理をキャンセル。 新しく到達したメッセージの処理を優先して開始する。 | キャンセル処理はCancellationToken を使って自分で実装する必要がある。 |
Parallel | 新しく来たメッセージを用いて即座に非同期を実行する。非同期処理が終わったものから早いもの勝ちで出力される。 |
maxConcurrent で同時実行数を制限できる。maxConcurrent を超える数のメッセージはキューに積まれる。 |
SequentialParallel※ | 新しく来たメッセージを用いて即座に非同期を実行する。その非同期処理の終了順によらず、出力順が入力順と同じになるように順序調整される。 |
maxConcurrent で同時実行数を制限できる。maxConcurrent を超える数のメッセージはキューに積まれる。 |
ThrottleFirstLast | 非同期処理が実行されていないとき、新しく届いた値を処理する。非同期処理の実行中は最新の値を1つだけ保持し、非同期処理の終了時にそれを取り出して処理を行う。 |
ThrottleFirst とThrottleLast が合体した挙動 |
※ SequentialParallelはWhereAwait/SelectAwaitでのみ利用可
以下比較用のgif動画。
Switch
新しくメッセージがきたら今実行中の非同期処理をキャンセルし、新しく到達したメッセージの処理を実行する。Debounce
と似ている。
(今回の実装では待ち時間は毎回ランダムになるように設定)
SequentialParallel
新しく来たメッセージを用いて即座に非同期を実行する。その非同期処理の終了順によらず、出力順が入力順と同じになるように順序調整される。
(今回の実装では待ち時間は毎回ランダムになるように設定)
ThrottleFirstLast
非同期処理が実行されていないとき、新しく届いた値を処理する。非同期処理の実行中は最新の値を1つだけ保持し、非同期処理の終了時にそれを取り出して処理を行う。
(今回の実装では待ち時間は毎回ランダムになるように設定)
Zip系の挙動の違い
名前 | 挙動 |
---|---|
Zip | 複数のObservableの入力をキューにつめて、揃ったものから順次1つのOnNextとして発行する。 |
ZipLatest | 複数のObservableの入力のうち最新のものを1つだけ保持し、すべてが揃ったらそれを1つのOnNextとして発行する。 |
CombineLatest | 複数のObservableの入力のうち、それぞれ最新のものをセットにして1つのOnNextとして発行する。 |
WithLatestFrom | メインとなるObservableにOnNextが入力されたタイミングで別のObservableの最新値と合成して出力する。サブObservable側に一度もOnNextが発行されていない場合は何も出力されない。 |
以下比較用のgif動画およびマーブルダイアグラム。