13
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RxJavaの処理順をコードリーディング

Last updated at Posted at 2016-03-10

#RxJavaの処理順をコードリーディング

RxJavaってどんな順で処理してるんだろう。
ログや経験則でなんとなくわかる部分もあるが、エンジニアならコードリーディング。

※今回は一旦、observeOnやsubscribeOnなどのスケジューラの話はサンプルコード含め省略します。

目的

処理順やコードリーディングによる処理の中身をみることで、
RxJavaのユースケースについて考える。

##コードリーディング
###STEP1 RxJava 全体像把握
よくあるRxJavaを使ったAndroidのコードを簡略かつ少し分けて書くとこんな感じだと思います。
※ここではflatMap()やmap()等のオペレータでの加工は複雑化するので置いておきます。


// ① onSubscribeを作って
Observable.OnSubscribe<MyClass> onSubscribe = new Observable.OnSubscribe<MyClass>() {
    @Override
    public void call(Subscriber<? super MyClass> subscriber) {
            try {
                // sbscribe()後、最初に実行したい処理  ※今回はとりあえず意味もなくMyClassをnewするだけ
                MyClass myClass = new MyClass();
                
                subscriber.onNext(myClass);
                subscriber.onCompleted();
                
            } catch (Exception e) {
                subscriber.onError(e);
            }
    }
};

// ② onSubscribeからobservableを作って
observable = Observable.create(onSubscribe);


// ③ subscribe
observable  // Obserbavle<MyClass>
.subscribe(new Subscriber<MyClass>() {

    @Override
    public void onCompleted() {
    
    }

    @Override
    public void onError(Throwable e) {
    
    }

    @Override
    public void onNext(final MyClass myClass) {
        // myClassを使った処理
    }

});

つまりもっと丸めるとここでやってる処理らしい処理は↓これだけ
あとはコールバックが順番に動くというのがざっくり全体像


observable  // Obserbavle<MyClass>
.subscribe(new Subscriber<MyClass>() {

    @Override
    public void onCompleted() {
    
    }

    @Override
    public void onError(Throwable e) {
    
    }

    @Override
    public void onNext(final MyClass myClass) {
        // myClassを使った処理
    }

});

STEP2 subscribeのリーディングから出発

STEP1で処理はsubcribeで着火されるのだろうということがわかった。
そこでsubscribeから順に読んでいく。

subscribe()の中身を見てみるとこんなことをやってる

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

hookの部分は単純にRxJavaの実行用インスタンスみたい

RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

真ん中の↓の部分は単純に第二引数の「observable.onSubscribe」を返してるだけだった。

onSubscribeStart(observable, observable.onSubscribe)

つまり、丸めるとこう

hook.observable.onSubscribe.call(subscriber);

さて、このonSubscribeが実行しているcall(subscriber)は何を呼んでいるのか?
ここでSTEP1の①を思い出す。


// ① onSubscribeを作って
Observable.OnSubscribe<MyClass> onSubscribe = new Observable.OnSubscribe<MyClass>() {
    @Override
    public void call(Subscriber<? super MyClass> subscriber) {
            try {
                // sbscribe()後、最初に実行したい処理  ※今回はとりあえず意味もなくMyClassをnewするだけ
                MyClass myClass = new MyClass();
                
                subscriber.onNext(myClass);
                subscriber.onCompleted();
                
            } catch (Exception e) {
                subscriber.onError(e);
            }
    }
};

ははー
最初に動くのはこの①で作ったコールバックだ!

ということでnew Observable.OnSubscribe<MyClass>()で作ったコールバックがsubscribe直後に動くことが判明

OnSubscribeっていうくらいだからそりゃそうか

STEP3 onSubscribeのcallメソッドを通過

STEP2で最初に呼ばれるコールバックはonSubscribeのcallメソッドだということがわかりました。

そして、この流れだとcall(subscriber)のsubscriberはSTEP1の③のsubscribe()時に引数で渡しているnew Subscriber<MyClass>()だということがわかります。


observable  // Obserbavle<MyClass>
.subscribe(new Subscriber<MyClass>() {

    @Override
    public void onCompleted() {
    
    }

    @Override
    public void onError(Throwable e) {
    
    }

    @Override
    public void onNext(final MyClass myClass) {
        // myClassを使った処理
    }

});

つまり、onSubscribeのcallメソッドが実行しているonNext() onCompleted() onError()は↑こいつらだということがわかります。


Observable.OnSubscribe<MyClass> onSubscribe = new Observable.OnSubscribe<MyClass>() {
    @Override
    public void call(Subscriber<? super MyClass> subscriber) {
            try {
                // sbscribe()後、最初に実行したい処理  ※今回はとりあえず意味もなくMyClassをnewするだけ
                MyClass myClass = new MyClass();
                
                subscriber.onNext(myClass);
                subscriber.onCompleted();
                
            } catch (Exception e) {
                subscriber.onError(e);
            }
    }
};

コードリーディング結果

コードリーディングの結果下記のようなRxJavaコードであればコールバックが①②の順番で動くことがわかる。
今回の簡略化されたコードはあくまでもコード理解用のもので実用性はないが、
ここにflatMap()やMap()などのオペレータでの加工が入るともっと実用的でシーケンシャルな処理が可能になる。

flatMap()やMap()などのオペレータでの加工部分のコードリーディングはまた後日投稿しようと思います。


Observable.create(Observable.OnSubscribe<MyClass>() {
    @Override
    public void call(Subscriber<? super MyClass> subscriber) {
            try {
                // ①
                // sbscribe()後、最初に実行したい処理  ※今回はとりあえず意味もなくMyClassをnewするだけ
                MyClass myClass = new MyClass();
                
                subscriber.onNext(myClass);
                subscriber.onCompleted();
                
            } catch (Exception e) {
                subscriber.onError(e);
            }
    }
};).subscribe(new Subscriber<MyClass>() {

    @Override
    public void onCompleted() {
        // ②
    }

    @Override
    public void onError(Throwable e) {
        // ②
    }

    @Override
    public void onNext(final MyClass myClass) {
        // ②
    }

});

まとめ

ここまで見た限りではRxJavaはこれまであった処理自体を劇的に変えた
というよりは、コールバックのネストが発生する処理に対して見やすくかけるというのが特徴に見える。

そういう意味で、API通信周り以外でもコールバックのネストが発生する部分は検討価値がありそう。

13
13
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?