LoginSignup
7
4

RxをPub/Subパターンで捉えてみる

Last updated at Posted at 2021-07-18

Rxは Observerパターン を基にしていると説明されているものが多いですが、 Publisher-Subscriber(Pub/Sub)パターン で解釈した方が整理しやすい場面もあるのかなというお話です。
Rx全般に適用可能だとは思いますが、ここではUniRxのみを対象にして書きます。

Observerパターン

まず、Observerパターンの購読と通知部分を抜き出すと、以下のようになっています。

クラス構成

  • Subject
    • AddObserver(Observer)
    • NotifyObservers()
  • Observer
    • Update(通知データ)

処理の流れ

  1. Subjcet.AddObserver() を呼び出して SubjectObserver を登録する。
  2. 通知すべき情報が発生すると Subject.NotifyObservers() が呼び出される。
  3. 上記の中で Observer.Update() が呼び出され、 Observer へ通知が行われる。

Rxをこれで整理しよう考えたときに、ここには登場しない Observable をどう扱うべきか戸惑ってしまいました。
そこで、 Publisher-Subscriber(Pub/Sub)パターン をベースに考えた方が整理しやすいかもしれないと考えました。

Pub/Subパターン

Pub/Subパターンの購読と通知部分を抜き出すと、以下のようになっています。メソッド名は一例。

クラス構成

  • Publisher
    • AddBroker(Broker)
  • Broker
    • Subscribe(Subscriber)
    • Publish(通知データ)
  • Subscriber
    • OnNext(通知データ)

処理の流れ

  1. Subscriber 側から Broker.Subscribe() を呼び出し、 Broker に対して Publisher の購読と、発行時には Subscriber への通知を依頼する。
  2. Broker 側から Publisher.AddBroker() を呼び出して、 PublisherBroker を登録する。
  3. Publisher は発行タイミングが来たら、 Broker.Publish() を呼び出して Broker に発行を依頼する。
  4. Broker はそれを受けて、 Subscriber.OnNext() を呼び出して Subscriber に発行を通知する。

SubscriberBroker に対して「あの雑誌の定期購読に申し込んでおいて。私やり方知らないし。」と頼んでいる構図を思い浮かべましょう。

RxをPub/Subパターンに当て嵌めてみる

それではRxをPub/Subパターンに当て嵌めてみます。
Pub/Subパターンで考えると、各クラスの役割は以下のようになります。

  • Publisher - Subject
  • Broker - Observable
  • Subscriber - Observer

Observable の役割が定義されたので整理しやすくなりました。

Observerとは?

UniRxでのみRxに触れている場合は「 Observer なんて出てこないけど?」と思うかもしれません。
実は、頻繁に使っています。
以下のコードの Subscribe() の引数から Observer<T> が生成されるのです。

Observerをラムダ式から生成
SomeObservable.Subscribe(_ => DoSomethingOnNext());

上記のコードは下記と同じになります。

ObserverをFactoryメソッドで生成
SomeObservable.Subscribe(Observer.Create<Unit>(onNext: _ => DoSomethingOnNext());

詳細は以下の記事を参考にして下さい。
【連載:Reactive Extensions(Rx)入門】第2回 イベント・プログラミングとRx

Observableの正体

Subjcetnew Subject<T>() で自分でインスタンスを生成するし、Observerも上記の通りインスタンスを生成していることがわかりました。
では、以下の形でよく登場する Observable の実体は何者なのでしょうか。

よくあるIObservable登場パターン
private readonly ISubject<Unit> someSubject = new Subject<Unit>();
public IObservable<Unit> someObservable => this.someSubject;

これは、「Rxの SubjectObserverObservable を内包している」という定義がヒントになります。

Subject の購読、通知部分のみを簡易実装してみます。
ObserverObservable があると見間違えやすいので Broker にしておきます。

Subjectの一部を簡易実装
// ※UniRxの実装とは異なります
public class PartialSubject<T>
{
    private readonly InnerBroker innerBroker = new();

    private readonly InnerObserver innerObserver;

    // コンストラクタ
    public PartialSubject() => this.innerObserver = new(this.innerBroker);

    // Broker.Subscribe()のラッパー
    public IDisposable Subscribe(IObserver<T> observer)
    {
        // 自身の持つ内部Brokerに購読依頼をそのままスルー
        return this.innerBroker.Subscribe(observer);
    }

    // Observer.OnNext()のラッパー
    public void OnNext(T nextItem)
    {
        // イベント発火タイミングが来た!
        // 内部Observerに通知をスルー
        this.innerObserver.OnNext(nextItem);
    }

    // 親クラスのBrokerとなる内部クラス
    private class InnerBroker : IBroker<T>
    {
        private readonly List<IObserver<T>> observers = new();

        public IDisposable Subscribe(IObserver<T> observer)
        {
            this.observers.Add(observer);

            // Dispose時の処理は解説範囲外なので省略
            return Disposable.Empty;
        }
        
        public void Publish(T value) 
        {
            // 購読依頼を受けていたObserver達に発行を知らせる
            this.observers.ForEach(x => x.OnNext(value));
        }
    }
    
    // Subject利用クラスを購読するObserverとなる内部クラス
    private class InnerObserver : IObserver<T>
    {
        private readonly IObservable<T> broker;
        
        // コンストラクタ
        public InnerObserver(IBroker<T> broker) => this.broker = broker;
        
        public void OnNext(T value)
        {
            // 内部Observerの通知受信時の処理は、内部Brokerに発行を伝えること
            this.broker.Publish(value);
        }
    }
}

// IBrokerは IObservable + Publish()
public interface IBroker<T> : IObservable<T> {
    void Publish(T value);
}

Subject は内部に Broker (つまり Observable )を抱えているのですね。
そして、内部に Observer も抱えています。
IObservable<T> someObservable => this.someSubject の形で登場する Observable は、 Subjcet の内部 Observable のラッパーということになります。

実際には内部クラスである必要もないので、 SubjcetIObservableIObserver を実装することになりますが、整理するときは内部クラスに分けて考えた方がわかりやすいと思います。

Observable.Subscribe(Observer) は誰が誰に命令をしている?

Pub/Subパターンで考えると Observer = Subscriber なので、やっぱり購読するのはObserverなのだと思います。
Observable は中継役なので、こんな整理が良さそうです。

  • 誰が - Observable 利用クラスが
  • 誰に - Observable
  • 何を - 購読手続きを行って Observer が購読できるようにしてと

命令している。

Subject.OnNext() は誰が誰に何を命令している?

自作の Subject からメッセージを通知させるときには Subject.OnNext() を呼び出します。
これが、「 Observer.OnNext() は『次の通知が来たから処理して』という命令であるのに対し、 Subject.OnNext() はどういう意図の命名なのだろうか」と疑問でした。
これには以下のように処理を行っていると考えれば辻褄を合わせられそうです。

  1. Subject 利用クラスから SubjectObserver ラッパーの OnNext() を呼び出して発行通知を行う。
  2. SubjectObserver ラッパーは、自身内部の ObserverOnNext() を呼び出して発行通知をスルー。
  3. 内部 Observer は通知受信時の処理を行う。それは Subject の内部 BrokerPublish() を命令することである。
  4. 内部 Broker は事前に購読依頼を受けていた Observer 達の OnNext() を呼び出して発行通知を行う。

Subject.OnNext() と考えるのではなく、「 Subject の内部 ObserverSubject 利用クラスを観測しているので、然るべき時が来たら利用クラスから Observer.OnNext() を呼び出して発行通知を行う」と捉えます。

つまりこういうこと。

  • 誰が - Subscribe 利用クラスが
  • 誰に - Subject 内部の Observer
  • 何を - 発行通知受信時の処理を行うようにと

命令している。

改めて整理

ここまでの話を踏まえ、購読から通知までの処理の流れを整理します。

// 1. Subjectを生成。
private readonly Subject<Unit> someSubject = new Subject<Unit>();

// 2. Subject生成時にSubjectの内部Observerは利用クラスの観測を開始したと見なす。
//    こんなイメージ。
private IObserver<Unit> SomeObserver => this.someSubject;

// 3. Subjectの内部Observableを公開してこれを中継役とする。
public IObservable<Unit> SomeObservable => this.someSubject;

private void SubscribeEvent()
{
    // 4. 3.のObservableに購読手続きを依頼。実際に購読するObserverを引数で渡す。
    var observer = Observer.Create<Unit>(onNext: _ => DoSomethingOnNext());
    this.SomeObservable.Subscribe(observer);
}

private void FireEvent()
{
    // 5. イベント発火タイミングが来たので、当クラスを監視していることになっているObserverに
    //    通知受信時の処理を行うように命令する。
    this.SomeObserver.OnNext(Unit.Default);

    // 6. 5.の命令を受けてSomeObserverはSomeObservable.Publish()を実行し、発行を依頼する。

    // 7. 6.の命令を受けてSomeObservableは、4.で登録されていたObserverのOnNext()を呼び出し、
    //    通知受信時の処理を行うように命令する。
}
  • Observer は観測する人。
  • Observable は観測可能な状態を作る中継役。
  • Subscribe()Observable 利用者から Observable への購読手続きの依頼。購読するのは Observer
  • OnNext()Observer に対しての、「次の号が発行されたから、ふさわしい処理を行ってね」という通知。

各クラスの役割や、メソッドの主語が明確になって理解しやすくなりました。

これはあくまで一つの解釈の仕方ですが、Rxの理解で混乱している方のヒントとなれば幸いです。

7
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
4