33
37

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

R3 オペレーター/ファクトリーメソッド まとめ

Last updated at Posted at 2025-02-12

この記事について

次世代RxであるR3に登場するオペレーターおよびファクトリーのまとめ記事です。

R3自体はフレームワークに依存せずにピュアなC#で用いることができるライブラリです。
ですが本記事では UniRxとの比較を行うためにUnity上でR3を動作させています。

執筆環境

  • Unity 6.0.30f1
  • R3/R3.Unity Ver.1.3.0
  • UniRx Ver.7.1.0
  • UniTask Ver.2.5.10

サンプルプロジェクト

オペレーターおよびファクトリーの動作説明のために用意したサンプルプロジェクトがあります。

また一部のオペレーターの動作確認を行えるDEMOを作成してあります。

R3について

概要

R3についての解説は以前に執筆しているのでこちらを御覧ください。

概念・用語集

本記事で登場する単語について解説が必要そうなものを取り上げて紹介します。

用語 意味 補足
Observable RxやR3における中核となる概念・オブジェクト。イベントメッセージを伝達し途中にオペレーターを介することでメッセージの加工ができる存在。イベントシーケンスやイベントストリームなどと呼ばれることもある。 いわゆる「Subscribe()して使うヤツ」のこと
Observer Observableに登録してメッセージを受信する側のオブジェクトのこと。
オペレーター Operator。Observableに連結してメッセージの加工や挙動の変更を行うことができる仕組み。
ファクトリー Factory、ファクトリーメソッド。Observableを生成するためのstaticなメソッド群のこと。
購読/購読する ObservableObserverを登録してメッセージの受信体制を構築すること。その処理の代表格がSubscribe()なので「Subscribeする」という言い回しを使うこともある。 なおForEachAsync()~Async()などもSubscribe()と同等の処理を行っている。そのため「購読する」「Subscribeする」といった言い回しをした場合はこれらオペレーターも含んでいる。とにかく「ObservableObserverを登録する処理」を引っくるめてこう呼ぶ。
メッセージ R3のObservableで扱われるOnNext/OnErrorResume/OnCompletedのこと。文脈に寄ってはOnNext単体を指すこともある。 Observableを流れてくるイベントメッセージのこと。
OnNext Observableで扱われる常用のメッセージのこと。Observable<T>Tの部分。
OnErrorResume Observableで扱われるエラー通知用のメッセージのこと。OnErrorResumeは発行されたとしてもObservableの動作は継続する。 UniRxではOnErrorの発行をもってObservableは解体されていた。
OnCompleted Observableで扱われるObservableの動作完了用のメッセージのこと。内容によって正常終了と異常終了のどちらかを表現できる。 中身が空なら正常終了。例外が入っていた場合は異常終了。
完了/完了する OnCompletedのうち正常終了の方のメッセージが発行されてObservableが動作を停止すること。 Taskが完了する」という場合もだいたい同じ意味。
異常終了 OnCompletedのうち異常終了の方のメッセージが発行されてObservableが動作を停止すること。「OnCompleted(Exception)が発行される」もこの意味と同じ。

R3のオペレーター一覧

R3のオペレーターを列挙して説明します。

「UniRx」の項目はUniRxに同じ動作をするオペレーターが存在する場合に記載しています。(括弧)で包まれている場合は「似てるけどちょっと違うもの」を挙げています。

また オペレーター名の列からそのオペレーターを実際に動作させているサンプルコード(テストコード)に飛ぶことができます。

変換系

OnNextの内容を変化させるものをここにまとめています。

R3 UniRx 機能 一言でまとめると 戻り値
AsUnitObservable AsUnitObservable Observable<Unit>に変換する。 型の情報を落とす Observable<Unit>
Cast Cast OnNextの型を指定した型にキャストする。OfTypeに似ているが、こちらはキャストできない場合はOnErrorResumeが発行される。 型変換する Observable<TResult>
Chunk Buffer 発行されたOnNextの中身を指定した個数でまとめて1つのOnNextとして発行する。まとめる必要がない場合はThrottleLastでも良い。 まとめる Observable<T[]>
Chunk Buffer 発行されたOnNextの中身を指定した時間区間でまとめて1つのOnNextとして発行する。時間のカウントはOnNextが発行されたタイミングでスタートする。まとめる必要がない場合はThrottleLastでも良い。 まとめる Observable<T[]>
Chunk Buffer 発行されたOnNextをまとめて、別のObservableのOnNextタイミングで1つのOnNextとして発行する。 まとめる Observable<T[]>
ChunkFrame BatchFrame 指定したUnityのフレーム区間内に発行されたOnNextをまとめて1つのOnNextして発行する。 まとめる Observable<T[]>
ChunkUntil - 発行されたOnNextの中身を条件を満たすまでまとめ1つのOnNextとして発行する。 まとめる Observable<T[]>
FrameCount - OnNextが発行されたときに、それが何フレーム目に発行されたのかとセットにする。何フレーム目であるかはFrameProviderによって定まる。 時間を測る Observable<(long, T)>
FrameInterval FrameInterval OnNextが発行されたときに、1つ前のOnNext発行から何フレーム経過しているかとセットにする。フレーム数のカウントはFrameProviderによって定まる。 時間を測る Observable<(long, T)>
Index (Select) OnNextメッセージにIndexを振る。 採番する Observable<(int, T)>
Pairwise Pairwise 現在のOnNextの値と、その1つ前のOnNextの値をペアにする。 直前の値と最新の値を取り出す Observable<(T, T)>
Scan Scan OnNextが発行されるたびに畳み込み(fold)計算を行いその結果をOnNextとして発行する。AggregateAsyncに似ているが、こちらは型がObservable<T>である。 畳み込む Observable<T>
Select Select OnNextの値を用いて処理を実行しその結果に内容を差し替える。 変換する Observable<T>
SelectAwait - OnNextの値を用いて非同期処理を実行しその結果待って新しいOnNextとして発行する。指定するAwaitOperationで挙動が変化する。 変換する Observable<T>
SelectMany SelectMany Observable<T>から新たなObservable<TResult>を生成し、それを並列に合成する。 変換する、flatMap Observable<TResult>
TimeInterval TimeInterval/FrameTimeInterval 直前のOnNextと今のOnNextの時間間隔を計測し、結果をまとめてOnNextとして発行する。 時間を測る Observable<(TimeSpan, T)>
Timestamp (Tmestamp) OnNextにタイムスタンプを付与する。タイムスタンプはTimeProvider.GetTimestapmの値である。 時間を測る Observable<(long, T)>

フィルタ系

メッセージのフィルタリングを行うものをここにまとめています。

R3 UniRx 機能 一言でまとめると 戻り値
Debounce Throttle メッセージの流量を減らす。OnNextがまとめて発行されたときに、最後に値が発行されてから一定時間経過後に最後のOnNextを1つだけ発行する。 流量を絞る Observable<T>
Debounce - メッセージの流量を減らす。OnNextがまとめて発行されたときに、非同期処理を実行し完了時に最後のOnNextを1つだけ発行する。 流量を絞る Observable<T>
DebounceFrame ThrottleFrame メッセージの流量を減らす。OnNextがまとめて発行されたときに、最後に値が発行されてから一定フレーム経過後に最後のOnNextを1つだけ発行する。 流量を絞る Observable<T>
Distinct Distinct OnNextから重複した値のメッセージを除外する。 重複削除 Observable<T>
DistinctBy Distinct OnNextの値を用いて加工を行い、その結果として重複したメッセージを除外する。 重複削除 Observable<T>
DistinctUntilChanged DistinctUntilChanged OnNextから連続で重複した値のメッセージを除外する。 連続した重複を削除 Observable<T>
DistinctUntilChangedBy DistinctUntilChanged OnNextの値を用いて加工を行い、その結果として連続で重複したメッセージを除外する。 連続した重複を削除 Observable<T>
IgnoreElements IgnoreElements OnNextを無視する。 無視する Observable<T>
IgnoreOnErrorResume (CatchIgnore) OnErrorResumeを無視する。 無視する Observable<T>
OfType OfType OnNextの型を指定した型にキャストする。Castと似ているが、こちらは型変換ができない場合は無視される。 型変換する Observable<TResult>
Skip Skip 購読開始から指定した個数のOnNextを無視する。 OnNextを無視する Observable<T>
Skip Skip 購読開始から指定した時間OnNextを無視する。 OnNextを無視する Observable<T>
SkipFrame - 購読開始から指定したフレーム間のOnNextを無視する。 OnNextを無視する Observable<T>
SkipLast - Observable完了時に最後から指定した時間のOnNextを無視する。 OnNextを無視する Observable<T>
SkipLastFrame - Observable完了時に最後から指定したフレーム間のOnNextを無視する。 OnNextを無視する Observable<T>
SkipUntil - 非同期処理が完了するまでの間はOnNextメッセージを無視し続ける。 OnNextを無視する Observable<T>
SkipUntil - CancellationTokenがキャンセルされるまでの間はOnNextメッセージを無視し続ける。 OnNextを無視する Observable<T>
SkipUntil SkipUntil 他のObservableのOnNextが到達するまでの間はOnNextメッセージを無視し続ける。 OnNextを無視する Observable<T>
SkipWhile SkipWhile OnNextメッセージが条件を満たす間はOnNextを無視し続ける。一度条件を満たさなくなったらそれ以降のOnNextはすべて通過させる。 OnNextを無視する Observable<T>
Take Take 購読開始から指定した個数のOnNextを通過させ、指定数を超えたらOnCompletedを発行する。 最初の方の値を取り出す Observable<T>
Take Take 購読開始から指定した時間OnNextを通過させ、その時間を超えたらOnCompletedを発行する。 最初の方の値を取り出す Observable<T>
TakeFrame - 購読開始から指定したフレーム間OnNextを通過させ、そのフレーム数を超えたらOnCompletedを発行する。 最初の方の値を取り出す Observable<T>
TakeLast TakeLast OnCompletedが発行されたときに最後から指定した個数のOnNextのみを取り出して発行する。 最後の方の値を取り出す Observable<T>
TakeLast TakeLast OnCompletedが発行されたときに最後から指定した時間以内のOnNextのみを取り出して発行する。 最後の方の値を取り出す Observable<T>
TakeLastFrame - OnCompletedが発行されたときに最後から指定したフレーム数以内のOnNextのみを取り出して発行する。 最後の方の値を取り出す Observable<T>
ThrottleFirst ThrottleFirst OnNextが発行されたらまずそれを通過させ、その次以降のOnNextを遮断する。遮断状態は一定時間経過で解除される。 流量を絞る Observable<T>
ThrottleFirst - OnNextが発行されたらまずそれを通過させ、その次以降のOnNextを遮断する。遮断状態は別のObservableからOnNextが発行されたら解除される。 流量を絞る Observable<T>
ThrottleFirst - OnNextが発行されたらまずそれを通過させ、非同期処理を実行する。その非同期処理が完了するまでの間OnNextを遮断する。 流量を絞る Observable<T>
ThrottleFirstFrame ThrottleFirstFrame OnNextが発行されたら次以降のOnNextを遮断する。遮断状態は一定フレーム経過で解除される。 流量を絞る Observable<T>
ThrottleFirstLast - OnNextが発行されたらまずそれを通過させ、一定時間の間OnNextを遮断する。遮断期間中に発行されたOnNextがあった場合は、遮断期間終了時にその最後のものを1つ発行する。 流量を絞る Observable<T>
ThrottleFirstLast - OnNextが発行されたらまずそれを通過させ、、それ以降のOnNextを遮断する。指定した別のObservableからOnNextが発行されると遮断は解除される。遮断中に発行されたOnNextがあった場合は、遮断期間終了時にその最後のものを1つ発行する。 流量を絞る Observable<T>
ThrottleFirstLast - OnNextが発行されたらまずそれを通過させ、、非同期処理を実行しその実行中はそれ以降のOnNextを遮断する。遮断期間中に発行されたOnNextがあった場合は、遮断期間終了時にその最後のものを1つ発行する。 流量を絞る Observable<T>
ThrottleFirstLastFrame - OnNextが発行されたら次以降のOnNextを遮断する。遮断状態は一定フレーム経過で解除される。遮断期間中に発行されたOnNextがあった場合は、遮断期間終了時にその最後のものを1つ発行する。 流量を絞る Observable<T>
ThrottleLast Sample OnNextが発行されたらそれを含め一旦すべてのOnNextを遮断する。遮断状態は一定時間経過で解除され、解除時に一番最後に発行されていたOnNextを1つ発行する。Chunkに似ているがこちらは値をまとめることはない。 流量を絞る Observable<T>
ThrottleLast Sample OnNextが発行されたらそれを含め一旦すべてのOnNextを遮断する。別のObservableを指定しそのObservableからOnNextが発行されると遮断状態は解除される。解除時に一番最後に発行されていたOnNextを1つ発行する。Chunkに似ているがこちらは値をまとめることはない。 流量を絞る Observable<T>
ThrottleLast - OnNextが発行されたらそれを含め一旦すべてのOnNextを遮断し、非同期処理を実行する。その非同期処理が完了すると遮断状態は解除される。解除時に一番最後に発行されていたOnNextを1つ発行する。 流量を絞る Observable<T>
ThrottleLastFrame SampleFrame OnNextが発行されたらそれを含め一旦すべてのOnNextを遮断する。遮断状態は一定フレーム経過で解除され、解除時に一番最後に発行されていたOnNextを1つ発行する。ChunkFrameに似ているがこちらは値をまとめることはない。 流量を絞る Observable<T>
Where Where OnNextの値を条件でフィルタリングする。 フィルタリングする Observable<T>
WhereAwait - OnNextの値を条件でフィルタリングする。その際の条件判定に非同期処理を用いることができる。指定するAwaitOperationで挙動が変化する。 フィルタリングする Observable<T>
WhereNotNull - Observableに対し、値がnullでない場合のみにフィルタリングする。 フィルタリングする Observable<T>

時間や実行コンテキストの制御系

メッセージの発行タイミングや、実行コンテキスト(スレッド)などを変化させるものをここにまとめています。

R3 UniRx 機能 一言でまとめると 戻り値
Delay Delay 発行されたすべての種別のメッセージを指定時間だけ遅延させる。 遅延させる Observable<T>
DelayFrame DelayFrame 発行されたすべての種別のメッセージを指定フレームだけ遅延させる。 遅延させる Observable<T>
DelaySubscription DelaySubscription Subscribeの実行を指定時間だけ遅延させる。 遅延して購読する Observable<T>
DelaySubscriptionFrame DelayFrameSubscription Subscribeの実行を指定フレームだけ遅延させる。 遅延して購読する Observable<T>
ObserveOn ObserveOn Observableの実行コンテキストを別のコンテキストに変更する。実行スレッドを変更したり、別のタイミングに移したりできる。 実行コンテキストを変える Observable<T>
ObserveOnCurrentSynchronizationContext - Observableの実行コンテキストをSynchronizationContextを用いて変更する。SynchronizationContextはObserveOnCurrentSynchronizationContextを呼び出したタイミングにおけるSynchronizationContext.Currentが用いられる。SynchronizationContext.Currentがnullの場合はスレッドプールで代用される。 実行コンテキストを変える Observable<T>
ObserveOnMainThread ObserveOnMainThread Unity専用。Observableの実行コンテキストをUnityメインスレッドに変更する。 実行コンテキストを変える Observable<T>
ObserveOnThreadPool ObserveOn Observableの実行コンテキストをThreadPoolに変更する。 実行コンテキストを変える Observable<T>
SubscribeOn SubscribeOn 「Subscribe()の内部処理」の実行コンテキストを指定できる。「Subscribe()の内部処理」とはObservableにObserverを登録し各Operatorが初期化される一連の処理のことを指す。発行されるメッセージの実行コンテキストを切り替えたいときはObserveOnを使うべき。 購読のコンテキストを変える Observable<T>
SubscribeOnCurrentSynchronizationContext - 「Subscribe()の内部処理」を現在の実行コンテキスト上のSynchronizationContextを指定して行う。「Subscribe()の内部処理」とはObservableにObserverを登録し各Operatorが初期化される一連の処理のことを指す。 購読のコンテキストを変える Observable<T>
SubscribeOnMainThread 「Subscribe()の内部処理」をメインスレッド上で行う。「Subscribe()の内部処理」とはObservableにObserverを登録し各Operatorが初期化される一連の処理のことを指す。 購読のコンテキストを変える Observable<T>
SubscribeOnSynchronize - 「Subscribe()の内部処理」の実行に排他ロックをかける。「Subscribe()の内部処理」とはObservableにObserverを登録し各Operatorが初期化される一連の処理のことを指す。 購読のコンテキストを変える Observable<T>
SubscribeOnThreadPool 「Subscribe()の内部処理」をスレッドプール上で行う。「Subscribe()の内部処理」とはObservableにObserverを登録し各Operatorが初期化される一連の処理のことを指す。 購読のコンテキストを変える Observable<T>

Observableの性質変化系

Observableそのものの挙動を変化させるものをここにまとめています。

R3 UniRx 機能 一言でまとめると 戻り値
Append - OnCompleted発行時に、Observableの最後に指定された値を挿入する。 最後に足す Observable<T>
DefaultIfEmpty DefaultIfEmpty OnNextが一度も発行されずにObservableが完了したときに、値を1つだけ発行する。 空のときの処理を足す Observable<T>
OnErrorResumeAsFailure - OnErrorResumeをOnCompleted(Exception)に変換する。 異常終了にする Observable<T>
Prepend StartWith Observableの購読直後に指定した値を挿入する。 最初に足す Observable<T>
TakeUntil TakeUntil 指定した別のObservableのOnNextが到達した場合にOnCompletedを発行し本流の購読を打ち切る。 購読を止める条件を追加する Observable<T>
TakeUntil TakeUntil OnNextの値が条件を満たしたときにOnCompletedを発行して購読を打ち切る。 購読を止める条件を追加する Observable<T>
TakeUntil - CancellationTokenがキャンセルされたときにOnCompletedを発行して購読を打ち切る。 購読を止める条件を追加する Observable<T>
TakeUntil - 非同期処理の完了を待機し、その完了時にOnCompletedを発行して購読を打ち切る。 購読を止める条件を追加する Observable<T>
TakeWhile TakeWhile OnNextが条件を満たし続ける間、OnNextを通過させ続ける。条件を満たさないOnNextが発行されるとOnCompletedを発行し購読を打ち切る。 購読を止める条件を追加する Observable<T>
Timeout (Timeout) OnNextの発行間隔が指定したTimeSpan以上開いたときにOnCompleted(TimeoutException)を発行してObservableを異常終了させる。 タイムアウト条件を足す Observable<T>
TimeoutFrame (TimeoutFrame) OnNextの発行間隔が指定したフレーム間隔以上開いたときにOnCompleted(TimeoutException)を発行してObservableを異常終了させる。 タイムアウト条件を足す Observable<T>
Trampoline ObserveOn(Scheduler.CurrentThread) Observableのメッセージが再帰した際に末尾再帰に変換する。 末尾再帰にする Observable<T>

Observableの合成系

複数のObservableを用いて1つのObservableを構築するものをここにまとめています。

R3 UniRx 機能 一言でまとめると 戻り値
Amb Amb 2つのObservableを同時に購読し、OnNextが先着した方のObservableのみを採択する。 先着したものを使う Observable<T>
Catch Catch 購読中のObservableからOnCompleted(Exception)が発行された場合、指定したObservableに購読先を切り替える。 異常終了の対応 Observable<T>
CombineLatest CombineLatest 複数のObservableの入力のうち、それぞれ最新のものをセットにして1つのOnNextとして発行する。 複数のObservableをまとめる Observable<TResult>
Concat Concat OnCompleted発行時に次のObservableに購読先を切り替える(Observableを直列に合成する)。 直列に連結する Observable<T>
Merge Merge 2つのObservableのメッセージを混ぜて1つのObservableとして扱う(Observableを並列に合成する)。 並列合成 Observable<T>
Switch Switch Observable<`Observable`>に対してのみ利用可能。新しく発行された`Observable`に対して購読を次々に乗り換えることができる。 購読先を切り替える Observable<T>
WithLatestFrom WithLatestFrom メインとなるObservableにOnNextが入力されたタイミングで別のObservableの最新値と合成して出力する。サブObservable側に一度もOnNextが発行されていない場合は何も出力されない。 2つのObservableを合成する Observable<TResult>
Zip Zip 複数のObservableの入力をキューにつめて、揃ったものから順次1つのOnNextとして発行する。 複数のObservableをまとめる Observable<TResult>
ZipLatest ZipLatest 複数のObservableの入力のうち最新のものを1つだけ保持し、揃ったものから順次1つのOnNextとして発行する。CombineLatestとの違いはこちらは一度発行した値は再利用されない。 複数のObservableをまとめる Observable<TResult>

Hot変換系

Hot変換目的のものをここにまとめています。Hot変換については過去に記事としてまとめているのでこちらをご参照ください。

R3 UniRx 機能 一言でまとめると 戻り値
Multicast Multicast Observable<T>を購読しその結果をすべて別のISubjectに流し込む。 他のSubjectに連結する ConnectableObservable<T>
Publish Publish Obsevableを購読しその結果をすべて後続に流し直す。Observableをあらかじめ購読しておく(Hot変換)ために用いる。Multicast(new Subject())またMulticast(new BehaviorSubject(initValue))と同義。 Hot変換する ConnectableObservable<T>
RefCount RefCount ConnectableObservable<T>のConnect()とそのDispose()を自動化する。購読者が1つ以上いる場合は自動的にConnect()し、購読者が0になったら自動的にDispose()する。Dispose()後もまた新たに購読者がきた場合は再度Connect()する。 Publish()を自動化する Observable<T>
Replay Replay Obsevableを購読しその結果をすべて後続に流し直す。Observableをあらかじめ購読しておく(Hot変換)ために用いる。Multicast(new ReplaySubject())と同義。 Hot変換する ConnectableObservable<T>
ReplayFrame - Obsevableを購読しその結果をすべて後続に流し直す。Observableをあらかじめ購読しておく(Hot変換)ために用いる。Multicast(new ReplayFrameSubject())と同義。 Hot変換する ConnectableObservable<T>
Share Share Publish().RefCount()と同義。 Publish().RefCount()と同じ Observable<T>

Task変換系

ObservableTaskに変換して扱うことができるものをここにまとめています。

R3 UniRx 機能 一言でまとめると 戻り値
AggregateAsync (Aggregate) OnCompleted発行時に、過去に発行されたOnNextを畳み込み(fold)計算する。Scanに似ているが、こちらはTask<T>である。 畳み込む Task<T>
AggregateByAsync - OnCompleted発行時に、過去に発行されたOnNextをkeySelectorでグループ化し、グループ単位で畳み込み(fold)計算する。 畳み込む Task<T>
AllAsync - OnCompleted発行時に、過去に発行されたすべてのOnNextが条件を満たしているかを調べる。 すべて合ってるか Task<bool>
AnyAsync - 条件を満たしたOnNextが発行されたら即座に完了するTask<bool>に変換する。ContainsAsyncとの違いはFuncを引数に取る。 どれか一個でも合ってるか Task<bool>
AverageAsync - OnCompleted発行時に、購読中に発行されたOnNextの平均値を求める。 平均する Task<double>
ContainsAsync - 指定した値を含んだOnNextが発行されたら即座に完了するTask<bool>に変換する。AnyAsyncとの違いはこちらは値そのものを引数に取る。 含むか調べる Task<bool>
CountAsync - OnCompleted発行時に、購読中に発行された条件を満たす値の個数を返す。 数える Task<int>
ElementAtAsync - 指定した個数番目(ゼロオリジン)のOnNextの値を返すTask<T>に変換する。指定した個数に達せずにObservableが完了したときはArgumentOutOfRangeExceptionとなる。 要素を取り出す Task<T>
ElementAtOrDefaultAsync - 指定した個数番目(ゼロオリジン)のOnNextの値を返すTask<T>に変換する。指定した個数に達せずにObservableが完了したときは既定値となる。 要素を取り出す Task<T>
FirstAsync (First) 条件を満たしたOnNextが発行されたら即座に完了するTask<T>に変換する。条件を満たすOnNextが発行されずにObservableが完了したときはInvalidOperationExceptionとなる。 最初のメッセージを待つ Task<T>
FirstOrDefaultAsync (FirstOrDefault) 条件を満たしたOnNextが発行されたら即座に完了するTask<T>に変換する。条件を満たすOnNextが発行されずにObservableが完了したときは既定値となる。 最初のメッセージを待つ Task<T>
IsEmptyAsync - OnNextが一度も発行されずにObservableが完了したかどうかを判定するTask<bool>に変換する。 空か調べる Task<bool>
LastAsync (Last) 一番最後に条件を満たしたOnNextを返すTask<T>に変換する。条件を満たすOnNextが発行されずにObservableが完了したときはInvalidOperationExceptionとなる。 最後のメッセージを待つ Task<T>
LastOrDefaultAsync (LastOrDefault) 一番最後に条件を満たしたOnNextを返すTask<T>に変換する。条件を満たすOnNextが発行されずにObservableが完了したときは既定値となる。 最初のメッセージを待つ Task<T>
LongCountAsync - OnCompleted発行時に、購読中に発行された条件を満たす値の個数を返す。CountAsyncとの違いは型がlongになっている。 数える Task<long>
MaxAsync - Observable<T>の完了時に指定した条件下での最大値(TResult型)を返すTaskに変換する。MaxByAsyncとの違いはこちらは「結果となる最大値そのもの」を返す。 最大値を調べる Task<TResult>
MaxByAsync - Observable<T>の完了時に指定した条件下での最大値を含んだT型を返すTaskに変換する。MaxAsyncとの違いはこちらは「最大値を含んだもとのOnNextの値」を返す。 最大値を含むものを調べる Task<T>
MinAsync - Observable<T>の完了時に指定した条件下での最小値(TResult型)を返すTaskに変換する。MinByAsyncとの違いはこちらは「結果となる最大値そのもの」を返す。 最小値を調べる Task<TResult>
MinByAsync - Observable<T>の完了時に指定した条件下での最小値を含んだT型を返すTaskに変換する。MinAsyncとの違いはこちらは「最小値を含んだもとのOnNextの値」を返す。 最小値を含むものを調べる Task<T>
MinMaxAsync - Observable<T>の完了時に指定した条件下での最小値と最大値(TResult型)を返すTaskに変換する。 最小値と最大値を調べる Task<(TResult, TResult)>
SequenceEqualAsync - 2つのObservableが発行する値が順序含め完全に一致するかを表すTask<bool>を返す。一致しないと判断された場合は直ちにfalseで終了するが、完全に一致するかどうかの判定を行う(trueを返す)ためには両者のObservableが完了する必要がある。 一致するか調べる Task<bool>
SingleAsync (Single) 購読開始から完了までの間に、条件を満たしたOnNextが1つだけであるかを調べその結果を返すTask<T>に変換する。条件を満たすOnNextが発行されなかった、または2つ以上発行された場合はInvalidOperationExceptionとなる。 1個だけか調べる Task<T>
SingleOrDefaultAsync (SingleOrDefault) 購読開始から完了までの間に、条件を満たしたOnNextが1つだけであるかを調べその結果を返すTask<T>に変換する。条件を満たすOnNextが発行されなかった場合は既定値が発行される。2つ以上発行された場合はInvalidOperationExceptionとなる。 1個だけか調べる Task<T>
SumAsync - OnNextの合算値を返すTask<T>に変換する。このTaskはOnCompleted発行時に完了する。 合計する Task<T>
ToArrayAsync (ToArray) Observableの完了時に購読開始から発行されたOnNextを配列にまとめて返すTaskに変換する。 配列に変換する Task<T[]>
ToDictionaryAsync - Observableの完了時に購読開始から発行されたOnNextをDictionaryにまとめて返すTask<Dictionary<TKey, TElement>>に変換する。 辞書に変換する Task<Dictionary<TKey, TElement>>
ToHashSetAsync - Observableの完了時に購読開始から発行されたOnNextをHashSetにまとめて返すTask>に変換する。 HashSet<T>に変換する Task<HashSet<T>>
ToListAsync - Observableの完了時に購読開始から発行されたOnNextをListにまとめて返すTask>に変換する。 List<T>に変換する Task<List<T>>
ToLookupAsync - Observableの完了時に購読開始から発行されたOnNextをLookUpにまとめて返すTask>に変換する。 ILookUp<TKey, T>に変換する Task<ILookup<TKey, T>>
WaitAsync (Wait), (ToTask) Observableが完了するのを待機するTaskに変換する。 待つ Task

その他

上記の分類に当てはまらないもの、または特殊なものをここにまとめています。

R3 UniRx 機能 一言でまとめると 戻り値
ForEachAsync (ForEachAsync) Observableを購読しつつそれが完了まで待機するTaskに変換する。要するに完了までawaitができるSubscribe()。 Subscribe()をawatiできるようにする Task
AsObservable AsObservable Observable型に変換する。ソースとなっている型(SubjectやReactivePropertyなど)へのダウンャストを防止できる。 Observableに変換する Observable<T>
AsSystemObservable - R3.Observable<T>からSystem.IObservable<T>へと変換する。 Rx/UniRxに繋ぎこむ System.IObservable<T>
Dematerialize Dematerialize Materializeを解除する。 Materializeを解除する Observable<T>
Do Do/DoOnError/DoOnCompleted/DoOnSubscribe ObservableのOnNext/OnErrorResume/OnCompleted/Subscribeのタイミングで任意のデリゲートを実行する。 副作用を起こす Observable<T>
DoCancelOnCompleted - OnCompleted時にCancellationTokenSourceをキャンセルする。 CancellationTokenSourceと連動させる Observable<T>
Materialize Materialize OnNext/OnErrorResume/OnCompletedをすべてOnNext型に変換する。Materializeを適用したObservableはDematerializeを適用することでもとに戻る。 すべてをOnNextに変換する。 Observable<Notification<T>>
SubscribeAwait - Subscribe()のメッセージハンドリング処理のasync/await対応版。指定するAwaitOperationで挙動が変化する。 非同期処理を使う IDisopsable
Synchronize Synchronize それ以降のOnNext処理に排他ロックをかける。ただしSelectAwaitやWhereAwaitなどの非同期処理が使えるOperatorを挟んだ場合は排他ロックが外れるため注意。 排他ロックする Observable<T>
ToAsyncEnumerable - Observable<T>IAsyncEumerable<T>に変換する。 IAsyncEnumerable<T>に変換する IAsyncEnumerable<T>
ToLiveList - 発行されたOnNextをリアルタイムに格納するコレクションLiveList<T>に変換する。 リアルタイムに変動するコレクションに変換する LiveList<T>

R3のファクトリー一覧

R3のファクトリーメソッドを列挙して紹介します。

メソッド 機能 戻り値
Amb 複数のObservableを同時に購読し、OnNextが先着した方のObservableのみを採択する。 Observable<T>
CombineLatest 複数のObservableの入力のうち、それぞれ最新のものをセットにして1つのOnNextとして発行する。 Observable<T>
Concat OnCompleted発行時に次のObservableに購読先を切り替える(Observableを直列に合成する)。 Observable<T>
Create 手続き処理を用いて任意のタイミングでメッセージを発行するObservableを作成する。 Observable<T>
CreateFrom IAsyncEnumerableをObservable<T>に変換する。 Observable<T>
Defer Observableの評価タイミングをSubscribe時まで遅延させる。 Observable<T>
Empty 購読直後にOnCompleted()を発行する。 Observable<T>
EveryUpdate 毎フレームOnNext(Unit)を発行する。フレームタイミングは指定可能。 Observable<Unit>
EveryValueChanged 指定オブジェクトを毎フレームチェックし、差分があれば値を取り出して発行する。 Observable<TProperty>
FromAsync 非同期処理(ValueTask)をObservableに変換する。 Observable<T>
FromEvent デリゲートをObservableに変換する。 Observable<Unit> / Observable<T>
FromEventHandler EventHandlerを用いたデリゲートをObservableに変換する。 Observable<(Object, TEventArgs)>
Interval 一定間隔でOnNextを発行する。 Observable<Unit>
IntervalFrame 一定フレーム間隔でOnNextを発行する。 Observable<Unit>
Merge Observableを並列に合成する。 Observable<T>
Never 何も発行しないObservableを生成する。 Observable<Unit>
NextFrame 必ず次のフレームで1回だけOnNext(Unit)を発行し、その直後にOnCompletedを発行する。Observable.YeildFrame()と似ているが、こちらは必ず次のフレームで発行されることが保障される。 Observable<Unit>
ObservePropertyChanged INotifyPropertyChangedをObservableに変換する。 Observable<TProperty>
ObservePropertyChanging INotifyPropertyChangingをObservableに変換する。 Observable<TProperty>
Range 指定した数値から連番で指定個数OnNext(int)を発行し、最後にOnCompletedを発行する。 Observable<int>
Repeat 指定した値を指定した回数繰り返し発行し、最後にOnCompletedを発行する。 Observable<T>
Return 指定した値を1回だけ発行し、OnCompletedを発行する。発行タイミングは調整可能。 Observable<T>
ReturnFrame 指定した値を1回だけ発行し、OnCompletedを発行する。発行するフレームは調整可能。 Observable<T>
ReturnOnCompleted OnCompletedを発行する。Resultの成否は指定可能。発行タイミングは調整可能。 Observable<T>
ReturnUnit OnNext(Unit)を発行し、OnCompletedを発行する。 Observable<Unit>
Throw 異常終了のOnCompletedを発行する。Observable.ReturnOnCompleted(Result.Failure(exception))と同義。 Observable<T>
Timer 指定時間後にOnNext(Unit)を1回発行し、OnCompletedを発行する。 Observable<Unit>
Timer 指定時間後にOnNext(Unit)を1回発行し、その後は別途指定した時間間隔ごとにOnNext(Unit)を発行し続ける。 Observable<Unit>
TimerFrame 指定フレーム後にOnNext(Unit)を1回発行し、OnCompletedを発行する。 Observable<Unit>
TimerFrame 指定フレーム後にOnNext(Unit)を1回発行し、その後は別途指定したフレーム間隔ごとにOnNext(Unit)を発行し続ける。 Observable<Unit>
ToObservable Task/ValueTask/IEnumerable/IAsyncEnumerable/System.IObservableをR3.Observableに変換する。Rx/UniRxからR3に接続する時にもこれを使う。 Observable<T>
Yield 指定のTimeProviderの次の実行タイミングにメッセージを発行する。TimeProvider未指定の場合はThreadPoolからメッセージ発行される。 Observable<T>
YieldFrame 指定のFrameProviderの次の実行タイミングにメッセージを発行する。Observable.NextFrame()に似ているが、こちらはタイミングによっては同一フレーム内でメッセージが発行されることがある。 Observable<T>
Zip 複数のObservableの入力をキューにつめて、揃ったものから順次1つのOnNextとして発行する。 Observable<TResult>
ZipLatest 複数のObservableの入力のうち最新のものを1つだけ保持し、揃ったものから順次1つのOnNextとして発行する。CombineLatestとの違いはこちらは一度発行した値は再利用されない。 Observable<TResult>

R3とUniRxの違い

名称が変更されたオペレーター

R3のREADMEにまとまっていますが、次のオペレーターは名称が変更されています。

  • Buffer -> Chunk
  • BatchFrame -> ChunkFrame
  • Throttle -> Debounce
  • ThrottleFrame -> DebounceFrame
  • Sample -> ThrottleLast
  • SampleFrame -> ThrottleLastFrame
  • StartWith -> Prepend
  • ObserveEveryValueChanged(this T value) -> Observable.EveryValueChanged(T value)
  • Distinct(selector) -> DistinctBy
  • DistinctUntilChanged(selector) -> DistinctUntilChangedBy
  • Finally -> Do(onDisposed:)
  • Do*** -> Do(on***:)
  • AsyncSubject<T> -> TaskCompletionSource<T>
  • First()FirstOrDefault()など -> ***Async (または Take(1), TakeLast(1))
  • ToTask(), ToUniTask() -> LastAsync() または FirstAsync()

一番大きな違いとしては、単発で値を取得する場合は~Async()といったTaskを返すオペレーターを用いてawaitで待ちけるようになったところです。

UniRxに存在したがR3で消滅したオペレーター等

(FirstFirstAsyncなどの似た挙動をするものが存在するものは除外。本当に跡形もなくなったもの。)

名前 R3での代替方法 備考
AsSingleUnitObservable WaitAsync() 挙動としてはAsUnitObservable().LastAsync()の方が近いが、想定用途としてはWaitAsync()の方が近い。
ContinueWith ~Async()async/awaitを併用する。 Observableを用いた非同期処理用のオペレーター。現代ではasync/awaitがあるのでそっちを使うことを推奨。
GroupBy - 地味に便利だったがR3では存在しない。
Repeat - R3ではObservableの再利用を推奨しないため存在しない。
RepeatUntil - R3ではObservableの再利用を推奨しないため存在しない。
RepeatSafe - R3ではObservableの再利用を推奨しないため存在しない。
Catch(OnErrorのハンドリング用途) - OnErrorOnErrorResumeに置き換わったため存在しない。
CatchIgnore - OnErrorOnErrorResumeに置き換わったため存在しない。
Reatry - OnErrorOnErrorResumeに置き換わったため存在しない。
OnErrorRetry - OnErrorOnErrorResumeに置き換わったため存在しない。
WhenAll LastAsync()したものをTask.WhenAllで待ち受ける。 Observableを用いた非同期処理用のオペレーター。現代ではasync/awaitがあるのでそっちを使うことを推奨。
Debug Doを用いて再現する。 Observable上で起きた事象をログに吐き出すオペレーター。再現方法はこちらの記事にて紹介。

R3ではUniRxと設計思想が異なるため、RepeatRetryといったオペレーターが消滅しています。
そのためもしこれらを用いたUniRxコードをR3に置き換える場合は結構な改修が必要になります。
これらオペレーターはOnErrorResumeAsFailure()ForEachAsync()をうまく使うことで挙動を再現することができるので、参考にしてください。

R3 挙動がややこしいオペレーター等の解説

  • Debounce,ThrottleFirst,ThrottleLast,ThrottleFirstLast
  • AwaitOperation
  • Zip, ZipLatest, CombineLatest, WithLatestFrom

これらオペレーターやプロパティは挙動が似ており非常にややこしいです。なのでわかりやすく挙動を確認できるページを用意したので活用してください。

Debounce系の挙動の違い

名前 挙動
Debounce OnNextが連続した場合は落ち着くのを待って最後の1個を取り出す。
ThrottleFirst OnNextが連続した場合の最初の1個を取り出す。
ThrottleLast OnNextが連続した場合の最後の1個を取り出す。
ThrottleFirstLast OnNextが連続した場合の最初と最後の1個ずつを取り出す。

以下比較用のgif動画。

Debounce

debounce.gif

OnNextが連続した場合は落ち着くのを待って最後の1個を取り出す。

OnNextが来るたびにその待ち時間はリセットされる。
(今回の実装では待ち時間は毎回ランダムになるように設定)

ThrottleFirst

ThrottleFirst.gif

OnNextが連続した場合は最初の1個を取り出す。一度発動すると一定時間はOnNextを遮断する。

待ち時間はOnNextが来てもリセットされない。
(今回の実装では待ち時間は毎回ランダムになるように設定)

ThrottleLast

ThrottleLast.gif

OnNextが連続した場合は最後の1個を取り出す。待機中にOnNextが到達したら「待ち」を開始しその間に発行されたOnNextのうちの最後を取り出す。

待ち時間はOnNextが来てもリセットされない。
(今回の実装では待ち時間は毎回ランダムになるように設定)

ThrottleFirstLast

ThrottleFirstLast.gif

ThrottleFirstThrottleLastが合体した挙動をする。

待ち時間はOnNextが来てもリセットされない。
(今回の実装では待ち時間は毎回ランダムになるように設定)

AwaitOperationの挙動の違い

SubscribeAwait/WhereAwait/SelectAwaitは指定するAwaitOperationで挙動が大きく変化します。

AwaitOperation 非同期の実行中に次のメッセージが来たときの挙動 備考
Sequential 今実行中の非同期処理を優先。余剰なメッセージはキューに積む。非同期処理が終わり次第、次の1つを取り出して順番に非同期実行する。 デフォルト設定がこれ
Drop 今実行中の非同期処理を優先。余剰なメッセージは無視してなかったことにする。
Switch 今実行中の非同期処理をキャンセル。 新しく到達したメッセージの処理を優先して開始する。 キャンセル処理はCancellationTokenを使って自分で実装する必要がある。
Parallel 新しく来たメッセージを用いて即座に非同期を実行する。非同期処理が終わったものから早いもの勝ちで出力される。 maxConcurrentで同時実行数を制限できる。maxConcurrentを超える数のメッセージはキューに積まれる。
SequentialParallel※ 新しく来たメッセージを用いて即座に非同期を実行する。その非同期処理の終了順によらず、出力順が入力順と同じになるように順序調整される。 maxConcurrentで同時実行数を制限できる。maxConcurrentを超える数のメッセージはキューに積まれる。
ThrottleFirstLast 非同期処理が実行されていないとき、新しく届いた値を処理する。非同期処理の実行中は最新の値を1つだけ保持し、非同期処理の終了時にそれを取り出して処理を行う。 ThrottleFirstThrottleLastが合体した挙動

SequentialParallelはWhereAwait/SelectAwaitでのみ利用可

以下比較用のgif動画。

Sequential

Sequential.gif

メッセージを発行順にキューに積み、1つずつ順番に処理する。デフォルト挙動がこれ。

(今回の実装では待ち時間は毎回ランダムになるように設定)

Drop

Drop.gif

今実行中の非同期処理を優先。余剰なメッセージは無視してなかったことにする。ThrottleFirstと似ている。

(今回の実装では待ち時間は毎回ランダムになるように設定)

Switch

Switch.gif

新しくメッセージがきたら今実行中の非同期処理をキャンセルし、新しく到達したメッセージの処理を実行する。Debounceと似ている。

(今回の実装では待ち時間は毎回ランダムになるように設定)

Parallel

Parallel.gif

新しく来たメッセージを用いて即座に非同期を実行する。非同期処理が終わったものから早いもの勝ちで出力される。

(今回の実装では待ち時間は毎回ランダムになるように設定)

SequentialParallel

SequentialParallel.gif

新しく来たメッセージを用いて即座に非同期を実行する。その非同期処理の終了順によらず、出力順が入力順と同じになるように順序調整される。

(今回の実装では待ち時間は毎回ランダムになるように設定)

ThrottleFirstLast

ThrottleFirstLast_Await.gif

非同期処理が実行されていないとき、新しく届いた値を処理する。非同期処理の実行中は最新の値を1つだけ保持し、非同期処理の終了時にそれを取り出して処理を行う。

(今回の実装では待ち時間は毎回ランダムになるように設定)

Zip系の挙動の違い

名前 挙動
Zip 複数のObservableの入力をキューにつめて、揃ったものから順次1つのOnNextとして発行する。
ZipLatest 複数のObservableの入力のうち最新のものを1つだけ保持し、すべてが揃ったらそれを1つのOnNextとして発行する。
CombineLatest 複数のObservableの入力のうち、それぞれ最新のものをセットにして1つのOnNextとして発行する。
WithLatestFrom メインとなるObservableにOnNextが入力されたタイミングで別のObservableの最新値と合成して出力する。サブObservable側に一度もOnNextが発行されていない場合は何も出力されない。

以下比較用のgif動画およびマーブルダイアグラム。

Zip

Zip.jpg

Zip.gif

複数のObservableの入力をキューにつめて、揃ったものから順次1つのOnNextとして発行する。

ZipLatest

ZipLatest.jpg

ZipLatest.gif

複数のObservableの入力のうち最新のものを1つだけ保持し、すべてが揃ったらそれを1つのOnNextとして発行する。

CombineLatest

CombineLatest.jpg

CombineLatest.gif

複数のObservableの入力のうち、それぞれ最新のものをセットにして1つのOnNextとして発行する。

WithLatestFrom

WithLatestFrom.jpg

WithLatestFrom.gif

メインとなるObservableにOnNextが入力されたタイミングで別のObservableの最新値と合成して出力する。サブObservable側に一度もOnNextが発行されていない場合は何も出力されない。

今回はメイン側を「first」、サブ側を「second」と命名した。

33
37
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
33
37

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?