はじめに
以前、UniRx オペレータ逆引きという記事は書いたのですが、正引きがなかったので改めてまとめました。
フィルタ系
名前 | 効果・用途 | 備考 |
---|---|---|
Where<T> |
1:OnNextメッセージの内容を条件式でフィルタリングする | 後ろがSelect の場合は最適化が自動的に走る |
2:OnNextメッセージの内容および発行回数でフィルタリングする | ||
OfType<T> |
指定した型にキャスト可能なメッセージのみ通す |
Cast との違いは、変換失敗時に”何もしない”こと |
IgnoreElements<T> |
すべてのOnNextメッセージをフィルタリングする | 何も通さない |
Distinct<T> |
過去に入力された同値のメッセージをフィルタリングする |
Func<TSource, TKey> で比較値の取り出し方法の指定、およびIEqualityComparer の指定が可能 |
DistinctUntilChanged<T> |
直前に入力されたメッセージと同値の場合にフィルタリングする |
Func<TSource, TKey> で比較値の取り出し方法の指定、およびIEqualityComparer の指定が可能 |
First<T> |
条件を満たした最初の1つのOnNextメッセージのみを取り出したあと、OnCompletedメッセージをセットで発行する | 条件式を指定可能。OnNextメッセージが1つも入力されなかった場合はOnError(InvalidOperationException) が発行される。 |
FirstOrDefault<T> |
条件を満たした最初の1つのOnNextメッセージのみを取り出したあと、OnCompletedメッセージをセットで発行する | 条件式を指定可能。OnNextメッセージが1つも入力されなかった場合はOnNext(default<T>) が発行される。 |
Last<T> |
条件を満たした最後の1つのOnNextメッセージのみを取り出したあと、OnCompletedメッセージをセットで発行する | 条件式を指定可能。OnNextメッセージが1つも入力されなかった場合はOnError(InvalidOperationException) が発行される。 |
LastOrDefault<T> |
条件を満たした最後の1つのOnNextメッセージのみを取り出したあと、OnCompletedメッセージをセットで発行する | 条件式を指定可能。OnNextメッセージが1つも入力されなかった場合はOnNext(default<T>) が発行される。 |
Single<T> |
条件を満たすOnNextメッセージは必ず1回入力されるという制約をつける | 入力が0回だった、または2回以上条件を満たすOnNextメッセージが入力された場合はOnError(InvalidOperationException) が発行される。 |
SingleOrDefault<T> |
条件を満たすOnNextメッセージは必ず1回入力されるという制約をつける | 入力が0回だった、または2回以上条件を満たすOnNextメッセージが入力された場合はOnNext(default<T>) が発行される。 |
Skip<T> |
1:先頭から指定した個数のOnNextメッセージを無視する | 引数はint (無視する個数) |
2:購読開始から指定した期間メッセージを無視する | 引数はTimeSpan および使用するScheduler
|
|
SkipWhile<T> |
条件を満たす間はメッセージを無視する。一度でも条件を満たさなくなるとそれ以降は常に通過させる。 | |
Take<T> |
1:先頭から指定した個数のOnNextメッセージのみ通過させ、その直後にOnCompletedメッセージを発行する | 引数はint (通過させる個数) |
2:購読開始から指定した期間のみメッセージを通過させる。その後はOnCompletedメッセージを発行する | 引数はTimeSpan および使用するScheduler
|
|
TakeWhile<T> |
条件を満たす間のみメッセージを通過させる。条件を満たさなくなるとOnCompletedメッセージを発行する | |
TakeUntil<T> |
他のObservable からのOnNextメッセージ入力を受けた瞬間に、OnCompletedメッセージを発行する |
Observable を外から止めたいときに使える |
TakeUntilDestroy<T> |
指定したGameObject およびComponent が破棄されたタイミングでOnCompletedメッセージを発行する |
GameObject などに連動してObservable を止めたいときに使える |
TakeUntilDisable<T> |
指定したGameObject およびComponent がOnDisable() されたタイミングでOnCompletedメッセージを発行する |
|
TakeLast<T> |
1:OnCompletedメッセージが入力されたタイミングで、直前のOnNextメッセージを指定の個数分取り出す | OnCompletedメッセージが入力されるまで動作しない |
2:OnCompletedメッセージが入力されたタイミング、直前のOnNextメッセージを指定の期間分取り出す | OnCompletedメッセージが入力されるまで動作しない | |
Throttle<T> |
短期間に大量にメッセージが入力された場合はそれを無視し、落ち着いたタイミングで最後の1つだけを取り出して流す | 引数はTimespan (落ち着いたと判断するまでの猶予時間) |
ThrottleFrame<T> |
短期間に大量にメッセージが入力された場合はそれを無視し、落ち着いたタイミングで最後の1つだけを取り出して流す | 引数はフレーム数 |
ThrottleFirst<T> |
1度OnNextメッセージが入力されたらそれを流し、そのあとは一定時間メッセージを無視する | 連打の防止などに使える。引数はTimespan
|
ThrottleFirstFrame<T> |
1度OnNextメッセージが入力されたらそれを流し、そのあとは一定時間メッセージを無視する | 連打の防止などに使える。引数はフレーム数 |
メッセージの変換系
名前 | 効果・用途 | 備考 |
---|---|---|
Select<T> |
1:OnNextメッセージの内容を別の値に変換する | 後ろがWhere の場合は最適化が自動的に走る |
2:OnNextメッセージおよび発行回数を用いて別の値に変換する | ||
Cast<T> |
OnNextメッセージを指定した型にキャストする | キャスト失敗時にはOnErrorメッセージが発行される |
AsUnitObservable |
OnNextメッセージの型をUnit 型に変換する |
Select(_ => Unit.Default) の省略記法 |
AsSingleUnitObservable |
Observableが終了するタイミングでUnit 型のOnNextメッセージを1回だけ発行する |
LastOrDefault().AsUnitObservable() |
メッセージやストリームの合成系
名前 | 効果・用途 | 備考 |
---|---|---|
Merge<T> |
1.複数のストリームの内容を並列に結合する | 引数はIObservable<T>[]
|
2.IObservable<IObservable<T>> をIObservable<T> にまとめる |
IObservable<IObservable<T>> に対する拡張メソッド |
|
3.複数のストリームの内容を並列に結合する |
IEnumerable<IObservable<T>> に対する拡張メソッド |
|
Concat<T> |
1.複数のストリームの内容を直列に結合する | 引数はIObservable<T>[]
|
2.IObservable<IObservable<T>> を先頭から順番に購読する |
IObservable<IObservable<T>> に対する拡張メソッド |
|
3.複数のIObservable<T> を先頭から順番に購読する |
IEnumerable<IObservable<TSource>> に対する拡張メソッド |
|
SelectMany<T> |
1.OnNextメッセージを使って新しいObservable を生成し、それを並列に合成する |
flatMap に相当する |
2.OnNextメッセージを使って新しいObservable を生成し、そのメッセージを入力値と合成してから、並列に合成する |
引数は「Func<T, IObservable<TR>> collectionSelector 」「Func<T, TC, TR> resultSelector 」の2つ |
|
ContinueWith<T> |
SelectMany<T> の軽量版。1回しか実行されない代わりに軽量。 |
1回しかメッセージが発行されない非同期処理に向く |
Switch<T> |
購読する対象のObservable を次々に切り替える |
IObservable<IObservable<T>> に対してのみ使える |
Aggregate<T> |
OnNextメッセージに対して加工を行い、その結果を記録してさらに次のメッセージに引き継ぐ |
Scan<T> との違いは、OnCompletedメッセージが発行されたタイミングで最終結果を出力する |
Scan<T> |
OnNextメッセージに対して加工を行い、その結果を記録してさらに次のメッセージに引き継ぐ |
Aggregate<T> との違いは、こちらは1回処理するたびにメッセージを発行する |
Buffer<T> |
1.複数のOnNextメッセージを指定した個数分まとめて、1つのOnNextメッセージに加工する |
count == skip
|
2.複数のOnNextメッセージを指定した個数分まとめて、1つのOnNextメッセージに加工したあと、指定個数スキップする |
count とskip を別々に設定できる。Buffer(count:2, skip:1) にすると直前のメッセージとセットにしてメッセージを発行できる。 |
|
3.指定の時間間隔でメッセージをまとめる | ||
4.指定の時間間隔でメッセージをまとめたあと、指定した時間計測を一時中断する | ||
5.個数指定と時間指定を両方組み合わせてメッセージをまとめる | 時間か個数どちらかの条件を満たしたタイミングで出力される | |
6.メッセージをまとめ、外部からメッセージ入力されたタイミングで出力する | 引数はIObservable<TWindowBoundary> 。TWindowBoundary の型はなんでもよい |
|
BatchFrame<T> |
1.指定したフレーム数の間に発行されたOnNextメッセージを1つにまとめる | フレーム数と、FrameCountType を指定できる |
2.メッセージのタイミングを指定のFrameCountType のタイミング変換する |
IObservable<Unit> の場合 |
|
PairWise<T> |
1. 1個前のメッセージと最新のメッセージをセットにして出力する |
Buffer(count:2, skip:1) とだいたいおなじ挙動 |
2. 1個前のメッセージと最新のメッセージをセットにして、それをさらに加工して出力する |
Buffer(count:2, skip:1).Select() に似ている |
|
Zip<TLeft,TRight,TResult> |
1. 型が異なる2つのストリームを購読し、値がそれぞれセットで揃ったタイミングで加工して出力する |
IObservable<TLeft> に対してIObservable<TRight> を合成し、IObservable<TResult> になる |
Zip<T> |
2. 型が同じストリームを購読し、値がそれぞれセットで揃ったタイミングで加工して出力する。最大7ストリームまで合成できる |
IObservable<T> に対する拡張メソッド |
Zip<T> |
3. 型が同じ複数のObservableをまとめ、IList<T> として出力する |
「IEnumerable<IObservable<T>> に定義された拡張メソッド」または「Observable.Zip 」から利用できる。合成できるストリーム数に上限はない |
ZipLatest<TLeft,TRight,TResult> |
1. 型が異なる2つのストリームを購読し、値がそれぞれセットで揃ったタイミングで最新値のみを取り出して加工して出力する |
Zip との違いは、こちらは余剰に入力されたメッセージは破棄される |
ZipLatest<T> |
2. 型が同じストリームを購読し、値がそれぞれセットで揃ったタイミングで最新値のみを取り出して加工して出力する。最大7ストリームまで合成できる |
Zip との違いは、こちらは余剰に入力されたメッセージは破棄される |
CombineLatest<TLeft,TRight,TResult> |
過去に入力されたメッセージ値を記憶し、それを流用して合成を強制的に行う |
Zip との違いは、値がセットにならなくてもメッセージを無理やり合成する |
WithLatestFrom<TLeft,TRight,TResult> |
1つのストリームを主軸とし、そこに別のストリームの最新値を合成する | 別のストリームのメッセージを、もう片方のストリームのタイミングで読み取る、といったことができる。例:Update() で発行されたメッセージをFixedUpdate() で読み取る |
Observable.WhenAll<T> |
複数のObservableがすべて終了状態になるのを待ち、完了時にその結果をまとめて通知する |
static メソッド |
Amb<T> |
複数のストリームのうち、もっとも早くメッセージが到達したストリームをひとつだけ採択する |
ストリームそのものを加工する
名前 | 効果・用途 | 備考 |
---|---|---|
Delay<T> |
指定した時間分、OnNextとOnCompletedメッセージを遅延させる | OnErrorは素通しする |
DelayFrame<T> |
指定したフレーム分、OnNextとOnCompletedメッセージを遅延させる | OnErrorは素通しする |
DefaultIfEmpty<T> |
1回もOnNextメッセージが入力されずにOnCompletedメッセージが入力された場合に、デフォルト値を発行する | |
StartWith<T> |
1. 購読された瞬間に指定の値のOnNextメッセージを一番最初に発行する | |
StartWith<T> |
2. 購読された瞬間に指定の関数を実行しその結果をOnNextメッセージとして一番最初に発行する | 登録した関数は購読された瞬間に実行される |
StartWith<T> |
3. 購読された瞬間に指定された複数の値を一番最初に発行する | 指定された値はそれぞれ個別のOnNextメッセージとして発行される |
GroupBy<TSource, TKey> |
1. OnNextメッセージを指定した条件でグループ分けを行い、それぞれに分割したストリームとして出力する | 戻り値がIObservable<IGroupedObservable<TKey, TSource>> 型になる |
GroupBy<TSource, TKey, TElement> |
2. OnNextメッセージを指定した条件でグループ分けを行い、それぞれに分割したストリームにした上でそれを変換してから出力する | 戻り値がIObservable<IGroupedObservable<TKey, TElement>> 型になる |
Repeat<T> |
OnCompletedメッセージが入力されると、自分より上流のストリームに対して再購読を実行する | ストリームが完了したときにSubscribe() を再実行してくれる。無限ループに注意
|
RepeatSafe<T> |
Repeat<T> とほぼ同じ。ただし、OnCompletedメッセージが連続して入力されると処理を中断する |
Repeat<T> を安全にしたもの |
RepeatUntilDestroy<T> |
指定したGameObject かComponent が破棄されるまでの間、Repeat<T> として機能する |
Repeat<T> を安全にしたもの |
RepeatUntilDisable<T> |
指定したGameObject かComponent がDisableされるまでの間、Repeat<T> として機能する |
Repeat<T> を安全にしたもの |
Sample<T> |
1. OnNextメッセージを指定の時間間隔でサンプリングする | 一定時間ごとに、そのときの最新値を出力する |
Sample<T> |
2. OnNextメッセージを別のストリームの入力タイミングでサンプリングする | 別ストリームから入力があるたびに、そのときの最新値を出力する |
SampleFrame<T> |
OnNextメッセージを指定のフレーム間隔でサンプリングする | |
Timeout<T> |
1. OnNextメッセージの発行間隔が一定時間あいたらOnErrorメッセージを発行する | |
Timeout<T> |
2. 指定の時刻までにストリームが完了しなかったらOnErrorメッセージを発行する | |
TimeoutFrame<T> |
OnNextメッセージの発行間隔が一定フレーム数あいたらOnErrorメッセージを発行する | |
Timestamp<T> |
OnNextメッセージにそのメッセージが発行された時刻を付与する | 戻り値はIObservable<Timestamped<T>>
|
TimeInterval<T> |
OnNextメッセージが発行された時間間隔を計測し、それをメッセージに付与する | 戻り値はIObservable<TimeInterval<T>>
|
FrameTimeInterval<T> |
OnNextメッセージが発行された時間間隔を計測し、それをメッセージに付与する。ただし時間の計測にUnityのTime.time またはTime.unscaledTime を用いる |
時間の計測方法が違う以外はTimeInterval<T> と同じ |
FrameInterval<T> |
直前のOnNextメッセージとの経過フレーム数をOnNextメッセージに付け加える | |
Synchronize<T> |
メッセージ処理に排他ロックを行う |
lock に用いるオブジェクトを複数ストリームで共有することもできる |
エラーハンドリング
名前 | 効果・用途 | 備考 |
---|---|---|
Catch<T, TException> |
指定した型の例外を含むOnErrorメッセージがきた場合に登録した処理を実行し、任意のストリームに差し替える。型が一致しないOnErrorメッセージは無視する | 引数の型はFunc<TException, IObservable<T>> errorHandler 。TException の型は必ず明示しないといけない |
Observable.Catch<T> |
指定された複数のストリームを先頭から順番に購読し、OnNextメッセージをそのまま伝える。途中でOnErrorメッセージが発生すると次のストリームにスイッチする。OnCompletedメッセージが発行されたタイミングで終了する | 失敗したら次へ、失敗したら次へ、成功するまで繰り返す |
CatchIgnore<T, TException> |
指定した型の例外を含むOnErrorメッセージがきた場合に登録した処理を実行し、Observable.Empty<T> に差し替える。型が一致しないOnErrorメッセージは無視する |
エラーが起きたときにもみ消して終了する |
Retry<T> |
OnErrorメッセージが発行されたときに、上流のストリームを再購読する | 計何回まで試行するか指定できる |
OnErrorRetry<T> |
1. OnErrorメッセージが発行されたときに、上流のストリームを再購読する | 引数を何も指定しない場合。Retry<T> と同じ挙動をする |
OnErrorRetry<T, TException> |
2. 指定した型の例外を含むOnErrorメッセージがきた場合に登録した処理を実行し、上流のストリームを再購読する |
Catch + Retry の複合 |
OnErrorRetry<T, TException> |
3. 指定した型の例外を含むOnErrorメッセージがきた場合に登録した処理を実行し、一定時間待ってから上流のストリームを再購読する |
Catch + Retry の複合だが、リトライするまでのディレイをつけられる |
Finally<T> |
ストリームが解体されるタイミングで登録した関数を実行する | ストリームの解体とは次に挙げるシチュエーションのこと。「OnCompletedメッセージが発行される」「OnErrorメッセージが発行される」「Disposeにより購読が中断される」「Subscribeで登録した関数の処理途中で例外が発生する」 |
Hot変換用
名前 | 効果・用途 | 備考 |
---|---|---|
Multicast<T> |
指定のISubject を用いてHot変換する |
|
Publish<T> |
1. Subject<T> を用いてHot変換する |
Multicast(new Subject<T>()) に相当 |
Publish<T> |
2. BehaviorSubject<T> を用いてHot変換する |
初期値を設定した場合はこちらの挙動になる。Multicast(new BehaviorSubject<T>()) に相当 |
PublishLast<T> |
AsyncSubject<T> を用いてHot変換する |
Multicast(new AsyncSubject<T>()) に相当。OnErrorメッセージもキャッシュしてしまうのでエラーハンドリングに注意 |
Replay<T> |
ReplaySubject<T> を用いてHot変換する |
Multicast(new ReplaySubject<T>()) に相当 |
RefCount<T> |
Hot変換時、自分を購読するObserver がいる場合にストリームを稼働させる。Observer がいなくなると自動で停止する |
IConnectableObservable<T> に対してのみ利用可能 |
Share<T> |
Publish<T>().RefCount() の省略記法 |
Schedulerの切り替え
名前 | 効果・用途 | 備考 |
---|---|---|
ObserveOn<T> |
メッセージの実行コンテキストを指定のSchedulerに切り替える | |
ObserveOnMainThread<T> |
メッセージの実行コンテキストをUnityメインスレッドに切り替える |
MainThreadDispatchType でメインスレッド上のどのタイミングにするかを指定できる |
SubscribeOn<T> |
ストリームの初期構築を指定のSchedulerで行う | あまり使うことはない |
SubscribeOnMainThread<T> |
ストリームの初期構築をUnityメインスレッド上で行う | |
DelaySubscription<T> |
1. 指定された時間だけ待ってからSubscribe() を実行する |
購読開始のタイミングをずらせる |
DelaySubscription<T> |
2. 指定された時刻になったらSubscribe() を実行する |
購読開始のタイミングを指定できる |
DelayFrameSubscription<T> |
指定されたフレーム数だけ待ってからSubscribe() を実行する |
UpdateやFixedUpdateのカウント数を指定できる |
その他
名前 | 効果・用途 | 備考 |
---|---|---|
Do<T> |
ストリームのメッセージを用いて副作用を起こす。メッセージそのものは加工しない | ストリームの途中で、ストリーム外部の状態を変化させるときに使う |
DoOnError<T> |
OnErrorメッセージを用いて副作用を起こす。メッセージそのものは加工しない | OnErrorメッセージのみに反応 |
DoOnCompleted<T> |
OnCompletedメッセージを用いて副作用を起こす。メッセージそのものは加工しない | OnCompletedメッセージのみに反応 |
DoOnTerminate<T> |
OnErrorまたはOnCompletedメッセージが発行されたとき副作用を起こす。メッセージそのものは加工しない | OnErrorメッセージまたはOnCompletedメッセージに反応 |
DoOnSubscribe<T> |
ストリームがSubscribe() されたときに副作用を起こす |
購読された瞬間をログに出したりするとデバッグに便利 |
DoOnCancel<T> |
ストリームがDispose() によってキャンセルされたときに副作用を起こす |
OnErrorメッセージやOnCompletedメッセージには反応しない |
ForEachAsync<T> |
非同期処理の結果を伝搬しつつ、そのメッセージ消費を行う |
Do().Last().AsUnitObservable() に挙動は似ている。AsyncReactiveCommandと組み合わせると非常に便利 |
Materialize<T> |
すべてのメッセージをOnNext(Notification<T>) メッセージへと変換する |
OnErrorメッセージやOnCompletedメッセージをOnNextメッセージに格納してしまう。どんなメッセージが発行されたのかを列挙できるので、テストするときに便利 |
Dematerialize<T> |
Materialize<T> によって変換されたメッセージをもとに戻す |
|
AsObservable<T> |
指定のオブジェクトのインタフェースをIObservable<T> に制限する |
オブジェクトのダウンキャストやクロスキャストを防止できる |
ToArray<T> |
発行されたOnNextメッセージをすべてキャッシュし、OnCompletedメッセージが入力されたタイミングで1つのOnNext(T[]) メッセージに変換して出力する |
|
ToList<T> |
発行されたOnNextメッセージをすべてキャッシュし、OnCompletedメッセージが入力されたタイミングで1つのOnNext(IList<T>) メッセージに変換して出力する |
|
Wait<T> |
ストリームが完了するのをスレッドをブロックして待機する | メインスレッドで使うとフリーズするので常用禁止 |