Unity
Rx
UniRx

UniRx オペレータ逆引き

UniRxについての記事のまとめはこちら


UniRxで「○○をやりたいけど効率的な方法がわからない!」という方のために、UniRxでできることを逆引きでまとめてみました。


前提


  • ObservableはSubscribe(またはConnect)されたタイミングで生成される

  • Observableを流れるメッセージではOnNext,OnError,OnCompletedの3種類ある

  • 「Observableの最後の値」とは「OnCompleted発行時に最後に発行されたOnNext」という意味


オペレータ一覧

*が付いているものは複数使い道がある/言い回しが違うオペレータ


ファクトリメソッド

やりたいこと
オペレータ
備考

値を1つだけ発行したい
Observable.Return

値を繰り返し発行したい
Observable.Repeat

指定した範囲で数値を発行したい
Observable.Range

Observableの定義をSubscribe時まで遅延させたい
Observable.Defer

一定時間後に値を発行したい
Observable.Timer*

指定フレーム後に値を発行したい
Observabe.TimerFrame*

一定間隔で値を発行したい
Observable.Timer*/Observable.Interval*

TimerIntervalの違いはタイマの開始タイミング

一定フレーム間隔で値を発行したい
Observabe.TimerFrame*/Observable.IntervalFrame*

TimerIntervalの違いはタイマの開始タイミング

値を発行するObservableを自分で好きなように作りたい
Observable.Create

OnErrorを直ちに発行したい
Observable.Throw

OnCompleted直ちに発行したい
Observable.Empty

何も起きないObservableを定義したい
Observable.Never

C#のEventをObservableに変換したい
Observable.FromEvent*/Observable.FromEventPattern

UnityEventをObservableに変換したい
Observable.FromEvent*

UpdateをObservableに変換したい
Observable.EveryUpdate*
実態はMainThreadDispatcher上で動くコルーチン/OnCompletedは発行されないので寿命管理に注意/普段使いならUpdateAsObservableの方が良い

FixedUpdateをObservableに変換したい
Observable.FixedEveryUpdate*
実態はMainThreadDispatcher上で動くコルーチン/OnCompletedは発行されないので寿命管理に注意/普段使いならFixedUpdateAsObservableの方が良い


メッセージのフィルタ

やりたいこと
オペレータ
備考

条件式を満たすものだけ通したい
Where
他の言語だとfilterと呼ばれる

重複したものを除きたい
Distinct

値が変化した時のみ通したい
DistinctUntilChanged

まとめて流れてきたOnNextの最後だけ通したい
Throttle*/ThrtottleFrame*

まとめて流れてきたOnNextの最初だけ通したい
ThrottleFirst*/ThrottleFirstFrame*

一番最初に到達したOnNextのみを流してObservableを完了させたい
First/FirstOrDefault

OnNextが2つ以上発行されたらエラーにしたい
Single/SingleOrDefault

Observableの最後の値だけ通したい
Last/LastOrDefault

先頭から指定した個数だけ通したい
Take

先頭から条件が成り立たなくなるまで通したい
TakeWhile

先頭から指定したObservableにOnNextが来るまで通したい
TakeUntil

先頭から指定した個数無視したい
Skip

先頭から条件が成り立つ間は無視したい
SkipWhile

先頭から指定したObservableにOnNextが来るまで無視したい
SkipUntil

型が一致するもののみ通したい(型変換も同時にしたい)
OfType<T>

OnErrorまたはOnCompletedのみを通したい
IgnoreElements


Observable自体の合成

やりたいこと
オペレータ
備考

複数のObservableのうち一番早くメッセージが来たObservableを採用したい
Amb

複数のObservableにそれぞれ1つずつメッセージが来たらそれらを合成して流したい
Zip

複数のObservableにそれぞれ1つ以上メッセージが来たらそれらを合成して流したい(それぞれのObservableの最新のメッセージのみを保持)
ZipLatest

複数のObservableのどれかに値が来たら他のObservableの過去の値と合成して流したい
CombineLatest

2つのObservableのうち片方を主軸にし、片方のObservableの最新値を合成したい
WithLatestFrom

複数のObservableを1本にまとめたい
Merge

ObservableのOnCompleted時に別のObservableを繋ぎたい
Concat

Observableの値を使って別のObservableを作り、それぞれの値を合成したい
SelectMany*
他の言語だとflatMapと呼ばれる

複数のObservableを成功するまで順番に実行したい
Observable.Catch

Catch(IEnumerable<IObservable<T>>)を使うと、順番に成功するまで1つずつ試行してくれる


Observable自体の変換

やりたいこと
オペレータ
備考

ObservableをReactivePropertyに変換したい
ToReactiveProperty*

ObservableをReadOnlyReactivePropertyに変換したい
ToReadOnlyReactiveProperty

コルーチンでObservableを待ちたい
ToYieldInstruction

OnCompletedがくるまでコルーチン上で待機できる


Observableの分岐

やりたいこと
オペレータ
備考

Observableを枝分かれさせたい
Publish/ToReactivePropety*
Publishの返り値はIConnectabaleObservableMulticast(Subject)と同義

Observableを枝分かれさせつつ、初期値を指定したい
Publish
引数を与えるとMulticast(BehaviorSubject)と同義になる

Observableを枝分かれさせ、その際にObservableの最後の値のみをキャッシュしたい
PublishLast

Multicast(AsyncSubject)と同義

Observableを枝分かれさせ、その際に今までに発行された全てのOnNextをキャッシュしたい
Replay

Multicast(ReplaySubject)と同義

Observableを枝分かれさせる時にSubjectを指定したい
Multicast

Observerが1つでもいたらConnectし、いなくなったらDisposeしてほしい
RefCount

Publish().RefCount()はほぼ定型文

Publish().RefCount()を省略したい
Share


メッセージ同士の合成・演算

やりたいこと
オペレータ
備考

メッセージの値と前回の結果との両方を使い関数を適用したい
Scan
LINQでいうAggregate

メッセージを一定個数ごとにまとめたい
Buffer*
第二引数を指定することで挙動が変わる→詳細

あるObservableにメッセージが来るまで値を塞き止めてまとめておきたい
Buffer*
引数にObservableを渡す

直前のメッセージとセットにしたい
PairWise

Bufer(2,1)と挙動は似ている


メッセージの変換

やりたいこと
オペレータ
備考

値を変換したい/値に関数を適用したい
Select
他の言語だとmapと呼ばれる

型変換をしたい
Cast<T>

メッセージの値を元に別のObservableを呼び出してそちらの結果を利用したい
SelectMany*
Observableを合成する

メッセージにイベントのメタ情報を付与したい
Materialize
OnNext/OnError/OnCompletedのどれであるかを示す情報を付与する

前回のメッセージからの経過時間を付与したい
TimeInterval

メッセージにタイムスタンプを付与したい
TimeStamp

メッセージをUnit型に変換したい
AsUnitObservable

Select(_=>Unit.Default)と同義


時間に絡んだ処理

やりたいこと
オペレータ
備考

一定時間後に値を発行したい
Observable.Timer*/Observabe.TimerFrame*
引数を1つだけ指定した場合はOneShotになる

一定間隔で値を発行したい
Observable.Timer*/Observable.Interval*

TimerIntervalの違いはタイマの開始タイミング

一定フレーム間隔で値を発行したい
Observabe.TimerFrame*/Observable.IntervalFrame*

TimerIntervalの違いはタイマの開始タイミング

メッセージを時間遅延させたい
Delay/DelayFrame

Subscribeしてから一定時間以内にOnNextが来なかったらOnErrorを発行したい
Timeout

一定時間以内にまとめて値が来たら落ち着くまで待ってから最後の値を流したい
Throttle*/ThrottleFrame*

値が来たら一定時間Observableを遮断したい
ThrottleFirst*/ThrottleFirstFrame*

一定間隔で値を取り出したい
Sample

次のフレームで処理がしたい
Observable.NextFrame


非同期処理

やりたいこと
オペレータ
備考

処理を別スレッドで実行したい
Observale.ToAsync/Observable.Start

ToAsyncを使った場合はInvokeを呼び出すことで処理が始まる

Observableのメッセージ処理スレッドを切り替えたい
ObserveOn

Observableのメッセージ処理スレッドをUnityのメインスレッドへ切り替えたい
ObserveOnMainThread
別スレッドからUnityの処理を呼び出すときは必須

Observable構築の実行スレッドを切り替えたい
SubscribeOn

Observableの結果をコルーチン上で待ち受けたい
ToYieldInstruction
便利なので覚えておきたい

非同期処理を連鎖させたい
ContinueWith
SelectManyの単発版

ObserveOnとSubscribeOnがややこしいので注意が必要です。

SubscribeOnは「Subscribeした瞬間の、Observableを構築する処理をどのスレッド上で実行するか」を指定するオペレータです。

Subscribe(x=> /*ここの処理*/ )の実行スレッドを指定したい場合に使うべきオペレータはObserveOnの方です。


エラーハンドリング

やりたいこと
オペレータ
備考

OnErrorが来たら再度Subscribeしたい
Retry

OnErrorを受け取りエラー処理がしたい
Catch

OnErrorを受け取りエラー処理をした後、OnErrorを握りつぶしてOnCompletedに差し替える
CatchIgnore

OnErrorが来たらエラー処理をした後に一定時間後にSubscribeし直してほしい
OnErrorRetry


Observableの完了時の処理

やりたいこと
オペレータ
備考

OnCompletedが来たらもう一度Subscribeしたい
Repeat
気をつけないと無限ループを引き起こす

OnCompletedが来たらもう一度Subscribeしたい、ただし短期間にSubscribeが繰り返された時はRepeatを中止したい
RepeatSafe
無限ループ防止版。ただUncontrollableなのでオススメしない…

OnCompletedが来たらもう一度Subscribeしたい、ただし指定したGameObjectがDisableになったらRepeatを中止したい
RepeatUntilDisable
Repeatより安全

OnCompletedが来たらもう一度Subscribeしたい、ただし指定したGameObjectが破棄されたらRepeatを中止したい
RepeatUntilDestory
Repeatより安全

OnCompletedまたはOnErrorが来た時に処理をしたい
Finally


その他

やりたいこと
オペレータ
備考

Subscribe時に初期値を流したい
StartWith

メッセージの結果をT[]に変換したい
ToArray
OnCompletedが来ないと発動しない

Observableの結果を同期で待ち受けたい
Wait

Observableの途中の結果をログに出したい
Do

Observableの途中で副作用を起こしたい
Do

Subscribeされたときに処理をしたい
DoOnSubscribe

この場でメッセージを消費して、後続にはUnitを流したい
ForEachAsync
Last+Do+AsUnitObservableに近い

メッセージの処理に排他ロックを掛けたい
Synchronize
lockに用いるオブジェクトを指定することもできる


オペレータ以外


Subject系

やりたいこと
Subject

手続き的にObservableを構築して値を発行したい
Subject

Subjectに初期値を持たせたい/Subscribe時に直前の値を発行したい
BehaviorSubject

Subjectに過去に発行した全ての値をキャッシュさせSubscribe時にまとめて発行したい
ReplaySubject

Subjectを非同期処理に使いたい/最後のOnNextを1つだけキャッシュさせて発行したい
AsyncSubject

変数に対してSubscribeしたい
ReactiveProperty