Unity
Rx
UniRx

UniRx オペレータ逆引き

UniRxについての記事のまとめはこちら


UniRxで「○○をやりたいけど効率的な方法がわからない!」という方のために、UniRxでできることを逆引きでまとめてみました。

前提

  • ObservableはSubscribe(またはConnect)されたタイミングで生成される
  • Observableを流れるメッセージではOnNext,OnError,OnCompletedの3種類ある
  • 「Observableの最後の値」とは「OnCompleted発行時に最後に発行されたOnNext」という意味

オペレータ一覧

*が付いているものは複数使い道がある/言い回しが違うオペレータ

ファクトリメソッド

やりたいこと オペレータ 備考
値を1つだけ発行したい Observable.Return
値を繰り返し発行したい Observable.Repeat
指定した範囲で数値を発行したい Observable.Range
Observableの定義をSubscribe時まで遅延させたい Observable.Defer
一定時間後に値を発行したい Observable.Timer*
指定フレーム後に値を発行したい Observabe.TimerFrame*
一定間隔で値を発行したい Observable.Timer*/Observable.Interval* TimerIntervalの違いはタイマの開始タイミング
一定フレーム間隔で値を発行したい Observabe.TimerFrame*/Observable.IntervalFrame* TimerIntervalの違いはタイマの開始タイミング
値を発行するObservableを自分で好きなように作りたい Observable.Create
OnErrorを直ちに発行したい Observable.Throw
OnCompleted直ちに発行したい Observable.Empty
何も起きないObservableを定義したい Observable.Never
C#のEventをObservableに変換したい Observable.FromEvent*/Observable.FromEventPattern
UnityEventをObservableに変換したい Observable.FromEvent*
UpdateをObservableに変換したい Observable.EveryUpdate* 実態はMainThreadDispatcher上で動くコルーチン/OnCompletedは発行されないので寿命管理に注意/普段使いならUpdateAsObservableの方が良い
FixedUpdateをObservableに変換したい Observable.FixedEveryUpdate* 実態はMainThreadDispatcher上で動くコルーチン/OnCompletedは発行されないので寿命管理に注意/普段使いならFixedUpdateAsObservableの方が良い

メッセージのフィルタ

やりたいこと オペレータ 備考
条件式を満たすものだけ通したい Where 他の言語だとfilterと呼ばれる
重複したものを除きたい Distinct
値が変化した時のみ通したい DistinctUntilChanged
まとめて流れてきたOnNextの最後だけ通したい Throttle*/ThrtottleFrame*
まとめて流れてきたOnNextの最初だけ通したい ThrottleFirst*/ThrottleFirstFrame*
一番最初に到達したOnNextのみを流してObservableを完了させたい First/FirstOrDefault
OnNextが2つ以上発行されたらエラーにしたい Single/SingleOrDefault
Observableの最後の値だけ通したい Last/LastOrDefault
先頭から指定した個数だけ通したい Take
先頭から条件が成り立たなくなるまで通したい TakeWhile
先頭から指定したObservableにOnNextが来るまで通したい TakeUntil
先頭から指定した個数無視したい Skip
先頭から条件が成り立つ間は無視したい SkipWhile
先頭から指定したObservableにOnNextが来るまで無視したい SkipUntil
型が一致するもののみ通したい(型変換も同時にしたい) OfType<T>
OnErrorまたはOnCompletedのみを通したい IgnoreElements

Observable自体の合成

やりたいこと オペレータ 備考
複数のObservableのうち一番早くメッセージが来たObservableを採用したい Amb
複数のObservableにそれぞれ1つずつメッセージが来たらそれらを合成して流したい Zip
複数のObservableにそれぞれ1つ以上メッセージが来たらそれらを合成して流したい(それぞれのObservableの最新のメッセージのみを保持) ZipLatest
複数のObservableのどれかに値が来たら他のObservableの過去の値と合成して流したい CombineLatest
2つのObservableのうち片方を主軸にし、片方のObservableの最新値を合成したい WithLatestFrom
複数のObservableを1本にまとめたい Merge
ObservableのOnCompleted時に別のObservableを繋ぎたい Concat
Observableの値を使って別のObservableを作り、それぞれの値を合成したい SelectMany* 他の言語だとflatMapと呼ばれる
複数のObservableを成功するまで順番に実行したい Observable.Catch Catch(IEnumerable<IObservable<T>>)を使うと、順番に成功するまで1つずつ試行してくれる

Observable自体の変換

やりたいこと オペレータ 備考
ObservableをReactivePropertyに変換したい ToReactiveProperty*
ObservableをReadOnlyReactivePropertyに変換したい ToReadOnlyReactiveProperty
コルーチンでObservableを待ちたい ToYieldInstruction OnCompletedがくるまでコルーチン上で待機できる

Observableの分岐

やりたいこと オペレータ 備考
Observableを枝分かれさせたい Publish/ToReactivePropety* Publishの返り値はIConnectabaleObservableMulticast(Subject)と同義
Observableを枝分かれさせつつ、初期値を指定したい Publish 引数を与えるとMulticast(BehaviorSubject)と同義になる
Observableを枝分かれさせ、その際にObservableの最後の値のみをキャッシュしたい PublishLast Multicast(AsyncSubject)と同義
Observableを枝分かれさせ、その際に今までに発行された全てのOnNextをキャッシュしたい Replay Multicast(ReplaySubject)と同義
Observableを枝分かれさせる時にSubjectを指定したい Multicast
Observerが1つでもいたらConnectし、いなくなったらDisposeしてほしい RefCount Publish().RefCount()はほぼ定型文
Publish().RefCount()を省略したい Share

メッセージ同士の合成・演算

やりたいこと オペレータ 備考
メッセージの値と前回の結果との両方を使い関数を適用したい Scan LINQでいうAggregate
メッセージを一定個数ごとにまとめたい Buffer* 第二引数を指定することで挙動が変わる→詳細
あるObservableにメッセージが来るまで値を塞き止めてまとめておきたい Buffer* 引数にObservableを渡す
直前のメッセージとセットにしたい PairWise Bufer(2,1)と挙動は似ている

メッセージの変換

やりたいこと オペレータ 備考
値を変換したい/値に関数を適用したい Select 他の言語だとmapと呼ばれる
型変換をしたい Cast<T>
メッセージの値を元に別のObservableを呼び出してそちらの結果を利用したい SelectMany* Observableを合成する
メッセージにイベントのメタ情報を付与したい Materialize OnNext/OnError/OnCompletedのどれであるかを示す情報を付与する
前回のメッセージからの経過時間を付与したい TimeInterval
メッセージにタイムスタンプを付与したい TimeStamp
メッセージをUnit型に変換したい AsUnitObservable Select(_=>Unit.Default)と同義

時間に絡んだ処理

やりたいこと オペレータ 備考
一定時間後に値を発行したい Observable.Timer*/Observabe.TimerFrame* 引数を1つだけ指定した場合はOneShotになる
一定間隔で値を発行したい Observable.Timer*/Observable.Interval* TimerIntervalの違いはタイマの開始タイミング
一定フレーム間隔で値を発行したい Observabe.TimerFrame*/Observable.IntervalFrame* TimerIntervalの違いはタイマの開始タイミング
メッセージを時間遅延させたい Delay/DelayFrame
Subscribeしてから一定時間以内にOnNextが来なかったらOnErrorを発行したい Timeout
一定時間以内にまとめて値が来たら落ち着くまで待ってから最後の値を流したい Throttle*/ThrottleFrame*
値が来たら一定時間Observableを遮断したい ThrottleFirst*/ThrottleFirstFrame*
一定間隔で値を取り出したい Sample
次のフレームで処理がしたい Observable.NextFrame

非同期処理

やりたいこと オペレータ 備考
処理を別スレッドで実行したい Observale.ToAsync/Observable.Start ToAsyncを使った場合はInvokeを呼び出すことで処理が始まる
Observableのメッセージ処理スレッドを切り替えたい ObserveOn
Observableのメッセージ処理スレッドをUnityのメインスレッドへ切り替えたい ObserveOnMainThread 別スレッドからUnityの処理を呼び出すときは必須
Observable構築の実行スレッドを切り替えたい SubscribeOn
Observableの結果をコルーチン上で待ち受けたい ToYieldInstruction 便利なので覚えておきたい
非同期処理を連鎖させたい ContinueWith SelectManyの単発版

ObserveOnとSubscribeOnがややこしいので注意が必要です。
SubscribeOnは「Subscribeした瞬間の、Observableを構築する処理をどのスレッド上で実行するか」を指定するオペレータです。
Subscribe(x=> /*ここの処理*/ )の実行スレッドを指定したい場合に使うべきオペレータはObserveOnの方です。

エラーハンドリング

やりたいこと オペレータ 備考
OnErrorが来たら再度Subscribeしたい Retry
OnErrorを受け取りエラー処理がしたい Catch
OnErrorを受け取りエラー処理をした後、OnErrorを握りつぶしてOnCompletedに差し替える CatchIgnore
OnErrorが来たらエラー処理をした後に一定時間後にSubscribeし直してほしい OnErrorRetry

Observableの完了時の処理

やりたいこと オペレータ 備考
OnCompletedが来たらもう一度Subscribeしたい Repeat 気をつけないと無限ループを引き起こす
OnCompletedが来たらもう一度Subscribeしたい、ただし短期間にSubscribeが繰り返された時はRepeatを中止したい RepeatSafe 無限ループ防止版。ただUncontrollableなのでオススメしない…
OnCompletedが来たらもう一度Subscribeしたい、ただし指定したGameObjectがDisableになったらRepeatを中止したい RepeatUntilDisable Repeatより安全
OnCompletedが来たらもう一度Subscribeしたい、ただし指定したGameObjectが破棄されたらRepeatを中止したい RepeatUntilDestory Repeatより安全
OnCompletedまたはOnErrorが来た時に処理をしたい Finally

その他

やりたいこと オペレータ 備考
Subscribe時に初期値を流したい StartWith
メッセージの結果をT[]に変換したい ToArray OnCompletedが来ないと発動しない
Observableの結果を同期で待ち受けたい Wait
Observableの途中の結果をログに出したい Do
Observableの途中で副作用を起こしたい Do
Subscribeされたときに処理をしたい DoOnSubscribe
この場でメッセージを消費して、後続にはUnitを流したい ForEachAsync Last+Do+AsUnitObservableに近い
メッセージの処理に排他ロックを掛けたい Synchronize lockに用いるオブジェクトを指定することもできる

オペレータ以外

Subject系

やりたいこと Subject
手続き的にObservableを構築して値を発行したい Subject
Subjectに初期値を持たせたい/Subscribe時に直前の値を発行したい BehaviorSubject
Subjectに過去に発行した全ての値をキャッシュさせSubscribe時にまとめて発行したい ReplaySubject
Subjectを非同期処理に使いたい/最後のOnNextを1つだけキャッシュさせて発行したい AsyncSubject
変数に対してSubscribeしたい ReactiveProperty