48
33

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

UniRx オペレータ一覧

Posted at

はじめに

以前、UniRx オペレータ逆引きという記事は書いたのですが、正引きがなかったので改めてまとめました。

フィルタ系

名前 効果・用途 備考
Where<T> 1:OnNextメッセージの内容を条件式でフィルタリングする 後ろがSelectの場合は最適化が自動的に走る
2:OnNextメッセージの内容および発行回数でフィルタリングする
OfType<T> 指定した型にキャスト可能なメッセージのみ通す Castとの違いは、変換失敗時に”何もしない”こと
IgnoreElements<T> すべてのOnNextメッセージをフィルタリングする 何も通さない
Distinct<T> 過去に入力された同値のメッセージをフィルタリングする Func<TSource, TKey>で比較値の取り出し方法の指定、およびIEqualityComparerの指定が可能
DistinctUntilChanged<T> 直前に入力されたメッセージと同値の場合にフィルタリングする Func<TSource, TKey>で比較値の取り出し方法の指定、およびIEqualityComparerの指定が可能
First<T> 条件を満たした最初の1つのOnNextメッセージのみを取り出したあと、OnCompletedメッセージをセットで発行する 条件式を指定可能。OnNextメッセージが1つも入力されなかった場合はOnError(InvalidOperationException)が発行される。
FirstOrDefault<T> 条件を満たした最初の1つのOnNextメッセージのみを取り出したあと、OnCompletedメッセージをセットで発行する 条件式を指定可能。OnNextメッセージが1つも入力されなかった場合はOnNext(default<T>)が発行される。
Last<T> 条件を満たした最後の1つのOnNextメッセージのみを取り出したあと、OnCompletedメッセージをセットで発行する 条件式を指定可能。OnNextメッセージが1つも入力されなかった場合はOnError(InvalidOperationException)が発行される。
LastOrDefault<T> 条件を満たした最後の1つのOnNextメッセージのみを取り出したあと、OnCompletedメッセージをセットで発行する 条件式を指定可能。OnNextメッセージが1つも入力されなかった場合はOnNext(default<T>)が発行される。
Single<T> 条件を満たすOnNextメッセージは必ず1回入力されるという制約をつける 入力が0回だった、または2回以上条件を満たすOnNextメッセージが入力された場合はOnError(InvalidOperationException)が発行される。
SingleOrDefault<T> 条件を満たすOnNextメッセージは必ず1回入力されるという制約をつける 入力が0回だった、または2回以上条件を満たすOnNextメッセージが入力された場合はOnNext(default<T>)が発行される。
Skip<T> 1:先頭から指定した個数のOnNextメッセージを無視する 引数はint(無視する個数)
2:購読開始から指定した期間メッセージを無視する 引数はTimeSpanおよび使用するScheduler
SkipWhile<T> 条件を満たす間はメッセージを無視する。一度でも条件を満たさなくなるとそれ以降は常に通過させる。
Take<T> 1:先頭から指定した個数のOnNextメッセージのみ通過させ、その直後にOnCompletedメッセージを発行する 引数はint(通過させる個数)
2:購読開始から指定した期間のみメッセージを通過させる。その後はOnCompletedメッセージを発行する 引数はTimeSpanおよび使用するScheduler
TakeWhile<T> 条件を満たす間のみメッセージを通過させる。条件を満たさなくなるとOnCompletedメッセージを発行する
TakeUntil<T> 他のObservableからのOnNextメッセージ入力を受けた瞬間に、OnCompletedメッセージを発行する Observableを外から止めたいときに使える
TakeUntilDestroy<T> 指定したGameObjectおよびComponentが破棄されたタイミングでOnCompletedメッセージを発行する GameObjectなどに連動してObservableを止めたいときに使える
TakeUntilDisable<T> 指定したGameObjectおよびComponentOnDisable()されたタイミングでOnCompletedメッセージを発行する
TakeLast<T> 1:OnCompletedメッセージが入力されたタイミングで、直前のOnNextメッセージを指定の個数分取り出す OnCompletedメッセージが入力されるまで動作しない
2:OnCompletedメッセージが入力されたタイミング、直前のOnNextメッセージを指定の期間分取り出す OnCompletedメッセージが入力されるまで動作しない
Throttle<T> 短期間に大量にメッセージが入力された場合はそれを無視し、落ち着いたタイミングで最後の1つだけを取り出して流す 引数はTimespan(落ち着いたと判断するまでの猶予時間)
ThrottleFrame<T> 短期間に大量にメッセージが入力された場合はそれを無視し、落ち着いたタイミングで最後の1つだけを取り出して流す 引数はフレーム数
ThrottleFirst<T> 1度OnNextメッセージが入力されたらそれを流し、そのあとは一定時間メッセージを無視する 連打の防止などに使える。引数はTimespan
ThrottleFirstFrame<T> 1度OnNextメッセージが入力されたらそれを流し、そのあとは一定時間メッセージを無視する 連打の防止などに使える。引数はフレーム数

メッセージの変換系

名前 効果・用途 備考
Select<T> 1:OnNextメッセージの内容を別の値に変換する 後ろがWhereの場合は最適化が自動的に走る
2:OnNextメッセージおよび発行回数を用いて別の値に変換する
Cast<T> OnNextメッセージを指定した型にキャストする キャスト失敗時にはOnErrorメッセージが発行される
AsUnitObservable OnNextメッセージの型をUnit型に変換する Select(_ => Unit.Default)の省略記法
AsSingleUnitObservable Observableが終了するタイミングでUnit型のOnNextメッセージを1回だけ発行する LastOrDefault().AsUnitObservable()

メッセージやストリームの合成系

名前 効果・用途 備考
Merge<T> 1.複数のストリームの内容を並列に結合する 引数はIObservable<T>[]
2.IObservable<IObservable<T>>IObservable<T>にまとめる IObservable<IObservable<T>>に対する拡張メソッド
3.複数のストリームの内容を並列に結合する IEnumerable<IObservable<T>>に対する拡張メソッド
Concat<T> 1.複数のストリームの内容を直列に結合する 引数はIObservable<T>[]
2.IObservable<IObservable<T>>を先頭から順番に購読する IObservable<IObservable<T>>に対する拡張メソッド
3.複数のIObservable<T>を先頭から順番に購読する IEnumerable<IObservable<TSource>>に対する拡張メソッド
SelectMany<T> 1.OnNextメッセージを使って新しいObservableを生成し、それを並列に合成する flatMapに相当する
2.OnNextメッセージを使って新しいObservableを生成し、そのメッセージを入力値と合成してから、並列に合成する 引数は「Func<T, IObservable<TR>> collectionSelector」「Func<T, TC, TR> resultSelector」の2つ
ContinueWith<T> SelectMany<T>の軽量版。1回しか実行されない代わりに軽量。 1回しかメッセージが発行されない非同期処理に向く
Switch<T> 購読する対象のObservableを次々に切り替える IObservable<IObservable<T>>に対してのみ使える
Aggregate<T> OnNextメッセージに対して加工を行い、その結果を記録してさらに次のメッセージに引き継ぐ Scan<T>との違いは、OnCompletedメッセージが発行されたタイミングで最終結果を出力する
Scan<T> OnNextメッセージに対して加工を行い、その結果を記録してさらに次のメッセージに引き継ぐ Aggregate<T>との違いは、こちらは1回処理するたびにメッセージを発行する
Buffer<T> 1.複数のOnNextメッセージを指定した個数分まとめて、1つのOnNextメッセージに加工する count == skip
2.複数のOnNextメッセージを指定した個数分まとめて、1つのOnNextメッセージに加工したあと、指定個数スキップする countskipを別々に設定できる。Buffer(count:2, skip:1)にすると直前のメッセージとセットにしてメッセージを発行できる。
3.指定の時間間隔でメッセージをまとめる
4.指定の時間間隔でメッセージをまとめたあと、指定した時間計測を一時中断する
5.個数指定と時間指定を両方組み合わせてメッセージをまとめる 時間か個数どちらかの条件を満たしたタイミングで出力される
6.メッセージをまとめ、外部からメッセージ入力されたタイミングで出力する 引数はIObservable<TWindowBoundary>TWindowBoundaryの型はなんでもよい
BatchFrame<T> 1.指定したフレーム数の間に発行されたOnNextメッセージを1つにまとめる フレーム数と、FrameCountTypeを指定できる
2.メッセージのタイミングを指定のFrameCountTypeのタイミング変換する IObservable<Unit>の場合
PairWise<T> 1. 1個前のメッセージと最新のメッセージをセットにして出力する Buffer(count:2, skip:1)とだいたいおなじ挙動
2. 1個前のメッセージと最新のメッセージをセットにして、それをさらに加工して出力する Buffer(count:2, skip:1).Select()に似ている
Zip<TLeft,TRight,TResult> 1. 型が異なる2つのストリームを購読し、値がそれぞれセットで揃ったタイミングで加工して出力する IObservable<TLeft>に対してIObservable<TRight>を合成し、IObservable<TResult>になる
Zip<T> 2. 型が同じストリームを購読し、値がそれぞれセットで揃ったタイミングで加工して出力する。最大7ストリームまで合成できる IObservable<T>に対する拡張メソッド
Zip<T> 3. 型が同じ複数のObservableをまとめ、IList<T>として出力する IEnumerable<IObservable<T>>に定義された拡張メソッド」または「Observable.Zip」から利用できる。合成できるストリーム数に上限はない
ZipLatest<TLeft,TRight,TResult> 1. 型が異なる2つのストリームを購読し、値がそれぞれセットで揃ったタイミングで最新値のみを取り出して加工して出力する Zipとの違いは、こちらは余剰に入力されたメッセージは破棄される
ZipLatest<T> 2. 型が同じストリームを購読し、値がそれぞれセットで揃ったタイミングで最新値のみを取り出して加工して出力する。最大7ストリームまで合成できる Zipとの違いは、こちらは余剰に入力されたメッセージは破棄される
CombineLatest<TLeft,TRight,TResult> 過去に入力されたメッセージ値を記憶し、それを流用して合成を強制的に行う Zipとの違いは、値がセットにならなくてもメッセージを無理やり合成する
WithLatestFrom<TLeft,TRight,TResult> 1つのストリームを主軸とし、そこに別のストリームの最新値を合成する 別のストリームのメッセージを、もう片方のストリームのタイミングで読み取る、といったことができる。例:Update()で発行されたメッセージをFixedUpdate()で読み取る
Observable.WhenAll<T> 複数のObservableがすべて終了状態になるのを待ち、完了時にその結果をまとめて通知する staticメソッド
Amb<T> 複数のストリームのうち、もっとも早くメッセージが到達したストリームをひとつだけ採択する

ストリームそのものを加工する

名前 効果・用途 備考
Delay<T> 指定した時間分、OnNextとOnCompletedメッセージを遅延させる OnErrorは素通しする
DelayFrame<T> 指定したフレーム分、OnNextとOnCompletedメッセージを遅延させる OnErrorは素通しする
DefaultIfEmpty<T> 1回もOnNextメッセージが入力されずにOnCompletedメッセージが入力された場合に、デフォルト値を発行する
StartWith<T> 1. 購読された瞬間に指定の値のOnNextメッセージを一番最初に発行する
StartWith<T> 2. 購読された瞬間に指定の関数を実行しその結果をOnNextメッセージとして一番最初に発行する 登録した関数は購読された瞬間に実行される
StartWith<T> 3. 購読された瞬間に指定された複数の値を一番最初に発行する 指定された値はそれぞれ個別のOnNextメッセージとして発行される
GroupBy<TSource, TKey> 1. OnNextメッセージを指定した条件でグループ分けを行い、それぞれに分割したストリームとして出力する 戻り値がIObservable<IGroupedObservable<TKey, TSource>>型になる
GroupBy<TSource, TKey, TElement> 2. OnNextメッセージを指定した条件でグループ分けを行い、それぞれに分割したストリームにした上でそれを変換してから出力する 戻り値がIObservable<IGroupedObservable<TKey, TElement>>型になる
Repeat<T> OnCompletedメッセージが入力されると、自分より上流のストリームに対して再購読を実行する ストリームが完了したときにSubscribe()を再実行してくれる。無限ループに注意
RepeatSafe<T> Repeat<T>とほぼ同じ。ただし、OnCompletedメッセージが連続して入力されると処理を中断する Repeat<T>を安全にしたもの
RepeatUntilDestroy<T> 指定したGameObjectComponentが破棄されるまでの間、Repeat<T>として機能する Repeat<T>を安全にしたもの
RepeatUntilDisable<T> 指定したGameObjectComponentがDisableされるまでの間、Repeat<T>として機能する Repeat<T>を安全にしたもの
Sample<T> 1. OnNextメッセージを指定の時間間隔でサンプリングする 一定時間ごとに、そのときの最新値を出力する
Sample<T> 2. OnNextメッセージを別のストリームの入力タイミングでサンプリングする 別ストリームから入力があるたびに、そのときの最新値を出力する
SampleFrame<T> OnNextメッセージを指定のフレーム間隔でサンプリングする
Timeout<T> 1. OnNextメッセージの発行間隔が一定時間あいたらOnErrorメッセージを発行する
Timeout<T> 2. 指定の時刻までにストリームが完了しなかったらOnErrorメッセージを発行する
TimeoutFrame<T> OnNextメッセージの発行間隔が一定フレーム数あいたらOnErrorメッセージを発行する
Timestamp<T> OnNextメッセージにそのメッセージが発行された時刻を付与する 戻り値はIObservable<Timestamped<T>>
TimeInterval<T> OnNextメッセージが発行された時間間隔を計測し、それをメッセージに付与する 戻り値はIObservable<TimeInterval<T>>
FrameTimeInterval<T> OnNextメッセージが発行された時間間隔を計測し、それをメッセージに付与する。ただし時間の計測にUnityのTime.timeまたはTime.unscaledTimeを用いる 時間の計測方法が違う以外はTimeInterval<T>と同じ
FrameInterval<T> 直前のOnNextメッセージとの経過フレーム数をOnNextメッセージに付け加える
Synchronize<T> メッセージ処理に排他ロックを行う lockに用いるオブジェクトを複数ストリームで共有することもできる

エラーハンドリング

名前 効果・用途 備考
Catch<T, TException> 指定した型の例外を含むOnErrorメッセージがきた場合に登録した処理を実行し、任意のストリームに差し替える。型が一致しないOnErrorメッセージは無視する 引数の型はFunc<TException, IObservable<T>> errorHandlerTExceptionの型は必ず明示しないといけない
Observable.Catch<T> 指定された複数のストリームを先頭から順番に購読し、OnNextメッセージをそのまま伝える。途中でOnErrorメッセージが発生すると次のストリームにスイッチする。OnCompletedメッセージが発行されたタイミングで終了する 失敗したら次へ、失敗したら次へ、成功するまで繰り返す
CatchIgnore<T, TException> 指定した型の例外を含むOnErrorメッセージがきた場合に登録した処理を実行し、Observable.Empty<T>に差し替える。型が一致しないOnErrorメッセージは無視する エラーが起きたときにもみ消して終了する
Retry<T> OnErrorメッセージが発行されたときに、上流のストリームを再購読する 計何回まで試行するか指定できる
OnErrorRetry<T> 1. OnErrorメッセージが発行されたときに、上流のストリームを再購読する 引数を何も指定しない場合。Retry<T>と同じ挙動をする
OnErrorRetry<T, TException> 2. 指定した型の例外を含むOnErrorメッセージがきた場合に登録した処理を実行し、上流のストリームを再購読する Catch + Retryの複合
OnErrorRetry<T, TException> 3. 指定した型の例外を含むOnErrorメッセージがきた場合に登録した処理を実行し、一定時間待ってから上流のストリームを再購読する Catch + Retryの複合だが、リトライするまでのディレイをつけられる
Finally<T> ストリームが解体されるタイミングで登録した関数を実行する ストリームの解体とは次に挙げるシチュエーションのこと。「OnCompletedメッセージが発行される」「OnErrorメッセージが発行される」「Disposeにより購読が中断される」「Subscribeで登録した関数の処理途中で例外が発生する」

Hot変換用

名前 効果・用途 備考
Multicast<T> 指定のISubjectを用いてHot変換する
Publish<T> 1. Subject<T>を用いてHot変換する Multicast(new Subject<T>())に相当
Publish<T> 2. BehaviorSubject<T>を用いてHot変換する 初期値を設定した場合はこちらの挙動になる。Multicast(new BehaviorSubject<T>())に相当
PublishLast<T> AsyncSubject<T>を用いてHot変換する Multicast(new AsyncSubject<T>())に相当。OnErrorメッセージもキャッシュしてしまうのでエラーハンドリングに注意
Replay<T> ReplaySubject<T>を用いてHot変換する Multicast(new ReplaySubject<T>())に相当
RefCount<T> Hot変換時、自分を購読するObserverがいる場合にストリームを稼働させる。Observerがいなくなると自動で停止する IConnectableObservable<T>に対してのみ利用可能
Share<T> Publish<T>().RefCount()の省略記法

Schedulerの切り替え

名前 効果・用途 備考
ObserveOn<T> メッセージの実行コンテキストを指定のSchedulerに切り替える
ObserveOnMainThread<T> メッセージの実行コンテキストをUnityメインスレッドに切り替える MainThreadDispatchTypeでメインスレッド上のどのタイミングにするかを指定できる
SubscribeOn<T> ストリームの初期構築を指定のSchedulerで行う あまり使うことはない
SubscribeOnMainThread<T> ストリームの初期構築をUnityメインスレッド上で行う
DelaySubscription<T> 1. 指定された時間だけ待ってからSubscribe()を実行する 購読開始のタイミングをずらせる
DelaySubscription<T> 2. 指定された時刻になったらSubscribe()を実行する 購読開始のタイミングを指定できる
DelayFrameSubscription<T> 指定されたフレーム数だけ待ってからSubscribe()を実行する UpdateやFixedUpdateのカウント数を指定できる

その他

名前 効果・用途 備考
Do<T> ストリームのメッセージを用いて副作用を起こす。メッセージそのものは加工しない ストリームの途中で、ストリーム外部の状態を変化させるときに使う
DoOnError<T> OnErrorメッセージを用いて副作用を起こす。メッセージそのものは加工しない OnErrorメッセージのみに反応
DoOnCompleted<T> OnCompletedメッセージを用いて副作用を起こす。メッセージそのものは加工しない OnCompletedメッセージのみに反応
DoOnTerminate<T> OnErrorまたはOnCompletedメッセージが発行されたとき副作用を起こす。メッセージそのものは加工しない OnErrorメッセージまたはOnCompletedメッセージに反応
DoOnSubscribe<T> ストリームがSubscribe()されたときに副作用を起こす 購読された瞬間をログに出したりするとデバッグに便利
DoOnCancel<T> ストリームがDispose()によってキャンセルされたときに副作用を起こす OnErrorメッセージやOnCompletedメッセージには反応しない
ForEachAsync<T> 非同期処理の結果を伝搬しつつ、そのメッセージ消費を行う Do().Last().AsUnitObservable()に挙動は似ている。AsyncReactiveCommandと組み合わせると非常に便利
Materialize<T> すべてのメッセージをOnNext(Notification<T>)メッセージへと変換する OnErrorメッセージやOnCompletedメッセージをOnNextメッセージに格納してしまう。どんなメッセージが発行されたのかを列挙できるので、テストするときに便利
Dematerialize<T> Materialize<T>によって変換されたメッセージをもとに戻す
AsObservable<T> 指定のオブジェクトのインタフェースをIObservable<T>に制限する オブジェクトのダウンキャストやクロスキャストを防止できる
ToArray<T> 発行されたOnNextメッセージをすべてキャッシュし、OnCompletedメッセージが入力されたタイミングで1つのOnNext(T[])メッセージに変換して出力する
ToList<T> 発行されたOnNextメッセージをすべてキャッシュし、OnCompletedメッセージが入力されたタイミングで1つのOnNext(IList<T>)メッセージに変換して出力する
Wait<T> ストリームが完了するのをスレッドをブロックして待機する メインスレッドで使うとフリーズするので常用禁止
48
33
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
48
33

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?