UniRxについての記事のまとめはこちら
UniRxで「○○をやりたいけど効率的な方法がわからない!」という方のために、UniRxでできることを逆引きでまとめてみました。
前提
- ObservableはSubscribe(またはConnect)されたタイミングで生成される
- Observableを流れるメッセージではOnNext,OnError,OnCompletedの3種類ある
- 「Observableの最後の値」とは「OnCompleted発行時に最後に発行されたOnNext」という意味
オペレータ一覧
*が付いているものは複数使い道がある/言い回しが違うオペレータ
ファクトリメソッド
やりたいこと |
オペレータ |
備考 |
値を1つだけ発行したい |
Observable.Return |
|
値を繰り返し発行したい |
Observable.Repeat |
|
指定した範囲で数値を発行したい |
Observable.Range |
|
Observableの定義をSubscribe時まで遅延させたい |
Observable.Defer |
|
一定時間後に値を発行したい |
Observable.Timer* |
|
指定フレーム後に値を発行したい |
Observabe.TimerFrame* |
|
一定間隔で値を発行したい |
Observable.Timer*/Observable.Interval* |
Timer とInterval の違いはタイマの開始タイミング |
一定フレーム間隔で値を発行したい |
Observabe.TimerFrame*/Observable.IntervalFrame* |
Timer とInterval の違いはタイマの開始タイミング |
値を発行するObservableを自分で好きなように作りたい |
Observable.Create |
|
OnErrorを直ちに発行したい |
Observable.Throw |
|
OnCompleted直ちに発行したい |
Observable.Empty |
|
何も起きないObservableを定義したい |
Observable.Never |
|
C#のEventをObservableに変換したい |
Observable.FromEvent*/Observable.FromEventPattern |
|
UnityEventをObservableに変換したい |
Observable.FromEvent* |
|
UpdateをObservableに変換したい |
Observable.EveryUpdate* |
実態はMainThreadDispatcher上で動くコルーチン/OnCompletedは発行されないので寿命管理に注意/普段使いならUpdateAsObservableの方が良い |
FixedUpdateをObservableに変換したい |
Observable.FixedEveryUpdate* |
実態はMainThreadDispatcher上で動くコルーチン/OnCompletedは発行されないので寿命管理に注意/普段使いならFixedUpdateAsObservableの方が良い |
メッセージのフィルタ
やりたいこと |
オペレータ |
備考 |
条件式を満たすものだけ通したい |
Where |
他の言語だとfilter と呼ばれる |
重複したものを除きたい |
Distinct |
|
値が変化した時のみ通したい |
DistinctUntilChanged |
|
まとめて流れてきたOnNextの最後だけ通したい |
Throttle*/ThrtottleFrame* |
|
まとめて流れてきたOnNextの最初だけ通したい |
ThrottleFirst*/ThrottleFirstFrame* |
|
一番最初に到達したOnNextのみを流してObservableを完了させたい |
First/FirstOrDefault |
|
OnNextが2つ以上発行されたらエラーにしたい |
Single/SingleOrDefault |
|
Observableの最後の値だけ通したい |
Last/LastOrDefault |
|
先頭から指定した個数だけ通したい |
Take |
|
先頭から条件が成り立たなくなるまで通したい |
TakeWhile |
|
先頭から指定したObservableにOnNextが来るまで通したい |
TakeUntil |
|
先頭から指定した個数無視したい |
Skip |
|
先頭から条件が成り立つ間は無視したい |
SkipWhile |
|
先頭から指定したObservableにOnNextが来るまで無視したい |
SkipUntil |
|
型が一致するもののみ通したい(型変換も同時にしたい) |
OfType<T> |
|
OnErrorまたはOnCompletedのみを通したい |
IgnoreElements |
|
Observable自体の合成
やりたいこと |
オペレータ |
備考 |
複数のObservableのうち一番早くメッセージが来たObservableを採用したい |
Amb |
|
複数のObservableにそれぞれ1つずつメッセージが来たらそれらを合成して流したい |
Zip |
|
複数のObservableにそれぞれ1つ以上メッセージが来たらそれらを合成して流したい(それぞれのObservableの最新のメッセージのみを保持) |
ZipLatest |
|
複数のObservableのどれかに値が来たら他のObservableの過去の値と合成して流したい |
CombineLatest |
|
2つのObservableのうち片方を主軸にし、片方のObservableの最新値を合成したい |
WithLatestFrom |
|
複数のObservableを1本にまとめたい |
Merge |
|
ObservableのOnCompleted時に別のObservableを繋ぎたい |
Concat |
|
Observableの値を使って別のObservableを作り、それぞれの値を合成したい |
SelectMany* |
他の言語だとflatMap と呼ばれる |
複数のObservableを成功するまで順番に実行したい |
Observable.Catch |
Catch(IEnumerable<IObservable<T>>) を使うと、順番に成功するまで1つずつ試行してくれる |
Observable自体の変換
やりたいこと |
オペレータ |
備考 |
ObservableをReactivePropertyに変換したい |
ToReactiveProperty* |
|
ObservableをReadOnlyReactivePropertyに変換したい |
ToReadOnlyReactiveProperty |
|
コルーチンでObservableを待ちたい |
ToYieldInstruction |
OnCompleted がくるまでコルーチン上で待機できる |
Observableの分岐
やりたいこと |
オペレータ |
備考 |
Observableを枝分かれさせたい |
Publish/ToReactivePropety* |
Publishの返り値はIConnectabaleObservable 。Multicast(Subject) と同義 |
Observableを枝分かれさせつつ、初期値を指定したい |
Publish |
引数を与えるとMulticast(BehaviorSubject) と同義になる |
Observableを枝分かれさせ、その際にObservableの最後の値のみをキャッシュしたい |
PublishLast |
Multicast(AsyncSubject) と同義 |
Observableを枝分かれさせ、その際に今までに発行された全てのOnNextをキャッシュしたい |
Replay |
Multicast(ReplaySubject) と同義 |
Observableを枝分かれさせる時にSubjectを指定したい |
Multicast |
|
Observerが1つでもいたらConnectし、いなくなったらDisposeしてほしい |
RefCount |
Publish().RefCount() はほぼ定型文 |
Publish().RefCount() を省略したい |
Share |
|
メッセージ同士の合成・演算
やりたいこと |
オペレータ |
備考 |
メッセージの値と前回の結果との両方を使い関数を適用したい |
Scan |
LINQでいうAggregate |
メッセージを一定個数ごとにまとめたい |
Buffer* |
第二引数を指定することで挙動が変わる→詳細
|
あるObservableにメッセージが来るまで値を塞き止めてまとめておきたい |
Buffer* |
引数にObservableを渡す |
直前のメッセージとセットにしたい |
PairWise |
Bufer(2,1) と挙動は似ている |
メッセージの変換
やりたいこと |
オペレータ |
備考 |
値を変換したい/値に関数を適用したい |
Select |
他の言語だとmap と呼ばれる |
型変換をしたい |
Cast<T> |
|
メッセージの値を元に別のObservableを呼び出してそちらの結果を利用したい |
SelectMany* |
Observableを合成する |
メッセージにイベントのメタ情報を付与したい |
Materialize |
OnNext/OnError/OnCompletedのどれであるかを示す情報を付与する |
前回のメッセージからの経過時間を付与したい |
TimeInterval |
|
メッセージにタイムスタンプを付与したい |
TimeStamp |
|
メッセージをUnit型に変換したい |
AsUnitObservable |
Select(_=>Unit.Default) と同義 |
時間に絡んだ処理
やりたいこと |
オペレータ |
備考 |
一定時間後に値を発行したい |
Observable.Timer*/Observabe.TimerFrame* |
引数を1つだけ指定した場合はOneShotになる |
一定間隔で値を発行したい |
Observable.Timer*/Observable.Interval* |
Timer とInterval の違いはタイマの開始タイミング |
一定フレーム間隔で値を発行したい |
Observabe.TimerFrame*/Observable.IntervalFrame* |
Timer とInterval の違いはタイマの開始タイミング |
メッセージを時間遅延させたい |
Delay/DelayFrame |
|
最後にOnNextが発行されてから一定時間以内に次のOnNextが来なかったらOnErrorを発行したい |
Timeout |
|
Subscribeしてから一定時刻までにOnCompletedが来なかったらOnErrorを発行したい |
Timeout |
|
一定時間以内にまとめて値が来たら落ち着くまで待ってから最後の値を流したい |
Throttle*/ThrottleFrame* |
|
値が来たら一定時間Observableを遮断したい |
ThrottleFirst*/ThrottleFirstFrame* |
|
一定間隔で値を取り出したい |
Sample |
|
次のフレームで処理がしたい |
Observable.NextFrame |
|
非同期処理
やりたいこと |
オペレータ |
備考 |
処理を別スレッドで実行したい |
Observable.ToAsync/Observable.Start |
ToAsync を使った場合はInvoke を呼び出すことで処理が始まる |
Observableのメッセージ処理スレッドを切り替えたい |
ObserveOn |
|
Observableのメッセージ処理スレッドをUnityのメインスレッドへ切り替えたい |
ObserveOnMainThread |
別スレッドからUnityの処理を呼び出すときは必須 |
Observable構築の実行スレッドを切り替えたい |
SubscribeOn |
|
Observableの結果をコルーチン上で待ち受けたい |
ToYieldInstruction |
便利なので覚えておきたい |
非同期処理を連鎖させたい |
ContinueWith |
SelectManyの単発版 |
ObserveOnとSubscribeOnがややこしいので注意が必要です。
SubscribeOnは「Subscribeした瞬間の、Observableを構築する処理をどのスレッド上で実行するか」を指定するオペレータです。
Subscribe(x=> /*ここの処理*/ )
の実行スレッドを指定したい場合に使うべきオペレータはObserveOnの方です。
エラーハンドリング
やりたいこと |
オペレータ |
備考 |
OnErrorが来たら再度Subscribeしたい |
Retry |
|
OnErrorを受け取りエラー処理がしたい |
Catch |
|
OnErrorを受け取りエラー処理をした後、OnErrorを握りつぶしてOnCompletedに差し替える |
CatchIgnore |
|
OnErrorが来たらエラー処理をした後に一定時間後にSubscribeし直してほしい |
OnErrorRetry |
|
Observableの完了時の処理
やりたいこと |
オペレータ |
備考 |
OnCompletedが来たらもう一度Subscribeしたい |
Repeat |
気をつけないと無限ループを引き起こす |
OnCompletedが来たらもう一度Subscribeしたい、ただし短期間にSubscribeが繰り返された時はRepeatを中止したい |
RepeatSafe |
無限ループ防止版。ただUncontrollableなのでオススメしない… |
OnCompletedが来たらもう一度Subscribeしたい、ただし指定したGameObjectがDisableになったらRepeatを中止したい |
RepeatUntilDisable |
Repeatより安全 |
OnCompletedが来たらもう一度Subscribeしたい、ただし指定したGameObjectが破棄されたらRepeatを中止したい |
RepeatUntilDestory |
Repeatより安全 |
OnCompletedまたはOnErrorが来た時に処理をしたい |
Finally |
|
その他
やりたいこと |
オペレータ |
備考 |
Subscribe時に初期値を流したい |
StartWith |
|
メッセージの結果をT[] に変換したい |
ToArray |
OnCompletedが来ないと発動しない |
Observableの結果を同期で待ち受けたい |
Wait |
|
Observableの途中の結果をログに出したい |
Do |
|
Observableの途中で副作用を起こしたい |
Do |
|
Subscribeされたときに処理をしたい |
DoOnSubscribe |
|
この場でメッセージを消費して、後続にはUnitを流したい |
ForEachAsync |
Last+Do+AsUnitObservableに近い |
メッセージの処理に排他ロックを掛けたい |
Synchronize |
lockに用いるオブジェクトを指定することもできる |
オペレータ以外
Subject系
やりたいこと |
Subject |
手続き的にObservableを構築して値を発行したい |
Subject |
Subjectに初期値を持たせたい/Subscribe時に直前の値を発行したい |
BehaviorSubject |
Subjectに過去に発行した全ての値をキャッシュさせSubscribe時にまとめて発行したい |
ReplaySubject |
Subjectを非同期処理に使いたい/最後のOnNextを1つだけキャッシュさせて発行したい |
AsyncSubject |
変数に対してSubscribeしたい |
ReactiveProperty |