UniRxのMessageBrokerとは
知ってる方には怒られそうなざっくりした説明の仕方をすると、
UniRxに実装されているObservableなストリームを利用し、Pub/Sub型で使えるようにしたものです。
対象者が観察者の参照を保持する必要がなく、MessageBrokerが仲介者になることで、
観察者も対象者もMessageBrokerだけを知っていれば良い状態にすることが出来ます。
誰がどれを知っていないといけないのかを考えなくてよくなり、
必要なメッセージだけをSubscribeしてデータだけ拾ってこれるので、
依存が入り乱れることが無くなるという点がメリットだと思いますが、
代わりにどれが何を配信/購読しているのかは管理していかないといけないため、
その点はデメリットになるかもしれません。
好みが別れるところだと思います。
メッセージの配信
MessageBrokerには MessageBroker.Default
というstatic変数が用意されており、
最初からこの変数を仲介者として利用することが出来ます。
class EventA {}
class EventB {}
class EventC {}
というデータクラスを用意すると、
MessageBroker.Default.Publish<EventA>(new EventA());
MessageBroker.Default.Publish<EventB>(new EventB());
MessageBroker.Default.Publish<EventC>(new EventC());
という感じでメッセージを配信することが出来ます。
メッセージの購読
メッセージの購読は、Publishで使った型毎に行うことが出来ます。
MessageBroker.Default.Subscribe<EventA>(x => Debug.Log('Subscribe EventA'));
これでEventAだけを購読することができ、EventBやEventCが配信されてもこのSubscribeは反応しません。
配信時に設定されたデータを購読側で受け取ることが出来る為、
情報の伝達で困ることはそれほど無いと思います。
UniRxのMessageBrokerでWhenAll出来なかった
複数のメッセージを同時に待ち受けて、待ち合わせてから次の処理を行いたいと思い、
Observable.WhenAll(
MessageBroker.Default.Receive<EventA>(),
MessageBroker.Default.Receive<EventB>(),
MessageBroker.Default.Receive<EventC>()
).Subscribe(x => Debug.Log('OK')) // OKじゃない
で出来るのではと簡単に考えていたのですが、これだとReceive<T>()の戻り値が
IObservable<EventeA>とIObservable<EventB>になり、Observable.WhenAllは
引数が同一の型でないと受け付けないため、シンタックスエラーになります。
解決策
Observable.WhenAll(
MessageBroker.Default.Receive<EventA>().Take(1).AsUnitObservable(),
MessageBroker.Default.Receive<EventB>().Take(1).AsUnitObservable()
MessageBroker.Default.Receive<EventC>().Take(1).AsUnitObservable()
).Subscribe(x => Debug.Log('OK')) // OK
MessageBrokerでObservable.WhenAllを正常に動作させる条件が2つありました。
一つは、上記の通り戻り値の型を一致させること。
これはUniRxにAsUnitObservable()メソッドが用意されており、
これを使うことで戻り値をIObservableで統一することが出来ます。
もう一つは、Observable.WhenAllはOnCompleteしか認識しないことです。
MessageBroker.Default.Receive().AsUnitObservable()で型を統一することは出来るのですが、
そのままだとOnNextしか発行されないため、Observable.WhenAllが処理が完了したと確認することが出来ません。
今回の場合は初期化処理での待ち合わせだったため、すべて1回のみ購読出来ればそれでOKだったので、
Take(1)で、1回購読したら処理完了(=OnCompleteが発行される)とし、動作させることが出来ました。
[追記]
コメントにて、より便利でコード量の少ない書き方を教えていただきました。
kyubunsさん、ありがとうございます!
途中で試したこと(ハズレでした)
Coroutineに変換してみる
Observable.WhenAll(
MessageBroker.Default.Receive<EventA>().toAwaitCoroutine(),
MessageBroker.Default.Receive<EventB>().toAwaitCoroutine(),
MessageBroker.Default.Receive<EventC>().toAwaitCoroutine()
).Subscribe(x => Debug.Log('OK')) // NG
IEnumeratorからCoroutineに変換してみる
IEnumerator ReceiveEventA() {
yield return MessageBroker.Default.Receive<EventA>().ToAwaitableEnumerator();
}
IEnumerator ReceiveEventB() {
yield return MessageBroker.Default.Receive<EventB>().ToAwaitableEnumerator();
}
IEnumerator ReceiveEventC() {
yield return MessageBroker.Default.Receive<EventC>().ToAwaitableEnumerator();
}
Observable.WhenAll(
Observable.FromCoroutine(() => ReceiveEventA()),
Observable.FromCoroutine(() => ReceiveEventB()),
Observable.FromCoroutine(() => ReceiveEventC()),
).Subscribe(x => Debug.Log('OK')) // NG
感想
分かってみればなんでこんな簡単なことでと自分でも思うのですが、解決するのにちょっと時間がかかりました。
UniRxのソースコードを穴が開くほど見た結果、自分の思考回路に開いてる穴が見つかり、解決することが出来ました。
やはり答えはソースコードにあるのだなと再認識した一件でした。
(でももっと簡単なやり方ありそう)