Rxjavaのcompose、あるいはRxLifeCycleについて

  • 43
    いいね
  • 1
    コメント
この記事は最終更新日から1年以上が経過しています。

TLDR

  • composeはメソッドチェーンの中でObseravbleにアクセスできる最高のオペレーター
  • liftとの違いはObseravbleの中の値にアクセスできるかどうか
  • flatmapとの違いはObseravbleの中の値にアクセスできるかどうか + 実行されるタイミング
  • RxLifecycleはtakeUntilでライフサイクルイベントをbindしてる。

RxJava Advent Calendar 5日目の今日は、composeオペレーターについて紹介します。
composeといえば、最近ではtrello/RxLifecycleでも使われているので、馴染みのある方も多いかもしれませんね。
compose初日のエントリで紹介したliftと同じくRxJavaのメソッドチェーンを実現している要素の一つです。ではこの二つのオペレータにはどのような違いがあるのでしょうか?

liftとcomposeの違い

liftとcomposeの使い分けはjavadocの中に指針が書かれています。以下該当箇所を抜粋です。

If the operator you are creating is designed to act on the individual items emitted by a source Observable, use Observable.lift(rx.Observable.Operator<? extends R, ? super T>). If your operator is designed to transform the source Observable as a whole (for instance, by applying a particular set of existing RxJava operators to it) use compose.

雑に訳すと、

あなたが作ったオペレーターが、Obeservableから放出される個々の要素に作用するのであれば、"lift"を使うべきです。一方で、
あなたが作ったオペレーターが、Observableそのものに作用する場合には"compose"を使うべきでしょう。

といった感じでしょうか。

liftcomposeは似ているようで、実際には出来ることが異なります。表にまとめると次のようになります。

オペレーター 直接Observableを触れるか Observableの中の値にアクセスできるか
compose 出来る 出来ない
lift 出来ない 出来る

composeは直接値に触れることが出来ないので、mapのような処理は行えません。
一方でcomposeはObservableに直接触れるメリットがあります。RxlifeCycleはこの特性を利用してライフサイクルイベントをObservableに対してbindしているわけですね。

composeの仕組み

では実際にcomposeがどのように動作しているのか確認してみましょう。composeの実装は次のようになっています。

public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
    return ((Transformer<T, R>) transformer).call(this);
}

Transformer<? super T, ? extends R>を引数にとって、Transformerによる変換を自分自身に適応しているようですね。

では次に引数に登場するTransformerの実装を見てみましょう。

public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
    // cover for generics insanity
}

Transformerは、Obseravble<T>を受け取りObservable<R>を返すFunc1のようですね。 こちらもとてもシンプルでした。

つまりTransformerとは 「Observableを受け取り、加工して、返す関数」 なので、composeとは 「Observableが自分自身に関数を適応するためのオペレーター」 であることがわかりました。 liftの実装よりとてもシンプルでわかりやすい印象ですね。

composeとflatmapの違いについて

RxJavaの処理でよく使うものでflatmapがあります。これもObservableを返すオペレーターですが、どのような違いがあるんでしょうか。

オペレーター 呼ばれるタイミング 返り値 直接Observableを触れるか Observableの中の値にアクセスできるか
compose subscribeされたタイミング(一度だけ) Observable 出来る 出来ない
flatmap onNextが呼ばれるごと(複数回) Observable 出来ない 出来る

composeflatmapもとても似ていますが、これらも用途は全然違います。

flatmapObservableの中の値にアクセスできるので、その値を元に新しいObservableを作りリターンします。注目すべきポイントは、引数と返り値のObservableのインスタンスが異なる点です。これはflatmapは元のObservable自体にはアクセス出来ない事が理由です。
一方で、composeObservableの中の値にはアクセスできませんが、元のObservableにはアクセスできるので、元のObservableを加工して返します。
つまりcomposeの場合は、一般的には引数と返り値のObservableのインスタンス同じになります。[^1]

[^1] Transformerは別インスタンスのObservableを返すことも可能ですが、想定された使い方ではありません。

RxLifeCycleについて

では冒頭で紹介したtrello/RxLifecycleはどのような仕組みで動いているのか実装を追ってみましょう。

Rxlifecycleのサンプルコードは次のようです。

public class MyActivity extends RxActivity {
    @Override
    public void onResume() {
        super.onResume();
        myObservable
            .compose(bindToLifecycle())
            .subscribe();
    }
}

composeが使われていますね。 引数に渡させれているbindToLifeCycleとはなんでしょうか。実装を追ってみます。

@Override
public final <T> Observable.Transformer<? super T, ? extends T> bindToLifecycle() {
    return RxLifecycle.bindActivity(lifecycleSubject);
}

bindToLifecycleとは、RxLifecycle.bindActivity(lifecycleSubject)を返しているだけで、ここにはロジックはないようです。

どんどん実装を深掘りしていきます。RxLifecycle.bindActivityは次のような実装です。

public static <T> Observable.Transformer<? super T, ? extends T> bindActivity(Observable<ActivityEvent> lifecycle) {
    return bind(lifecycle, ACTIVITY_LIFECYCLE);
}

これもbindをリターンしているだけですね。ではbindの実装を確認しましょう。

 private static <T, R> Observable.Transformer<? super T, ? extends T> bind(Observable<R> lifecycle,
                                                      final Func1<R, R> correspondingEvents) {
  final Observable<R> sharedLifecycle = lifecycle.share();
  return new Observable.Transformer<T, T>() {
      @Override
      public Observable<T> call(Observable<T> source) {
          return source.takeUntil(
              Observable.combineLatest(
                  sharedLifecycle.take(1).map(correspondingEvents),
                  sharedLifecycle.skip(1),
                  new Func2<R, R, Boolean>() {
                      @Override
                      public Boolean call(R bindUntilEvent, R lifecycleEvent) {
                          return lifecycleEvent == bindUntilEvent;
                      }
                  })
                  .takeFirst(SHOULD_COMPLETE)
          );
      }
  };
}        

どうやらこのbind(Observable<R> lifecycle, Func1<R, R> correspondingEvents)の実装がコアの部分のようですね。順番に処理を追っていきましょう。(一部処理を簡略化してあります)

まず、final Observable<R> sharedLifecycle = lifecycle.share(); の部分ですね。ここではHotObservableに変換しています。これはライフサイクルイベントの検出漏れがないようにするためなのですが、詳しい話は省略します(僕もよくわかってない) ご興味があれば素晴らしい記事があるのでそちらをご覧ください。

return source.takeUntil(
      Observable.combineLatest(
          sharedLifecycle.take(1).map(correspondingEvents),
          sharedLifecycle.skip(1),
          new Func2<R, R, Boolean>() {
              @Override
              public Boolean call(R bindUntilEvent, R lifecycleEvent) {
                  return lifecycleEvent == bindUntilEvent;
              }
          })
          .takeFirst(SHOULD_COMPLETE)
  );

次にこの本体部分について紹介します。combineLatestの中身から見ていきましょう。(combineLatestとは、イベントを待ち合わせて、関数を適応するオペレーターです)

Observable.combineLatest(
  sharedLifecycle.take(1).map(correspondingEvents),
  sharedLifecycle.skip(1),
  new Func2<R, R, Boolean>() {
      @Override
      public Boolean call(R bindUntilEvent, R lifecycleEvent) {
          return lifecycleEvent == bindUntilEvent;
      }
});

sharedLifecycleというのはライフサイクルイベントですね。sharedLifecycle.take(1).map(correspondingEvents) では、Observableによる処理が実行されたイベントと対応関係にあたるイベントを取得しています。onCreateで呼ばれたならばonDestoryとなります。 sharedLifecycle.skip(1)はシンプルで2つ目移行のライフサイクルイベントを順番に取得していきます。Func2の実装では、これら二つのイベントが一致するか確かめています。

return source.takeUntil(
      Observable.combineLatest(
          Observable[対応するライフサイクルイベントが呼ばれるとイベントを発行するObservable]
          .takeFirst(SHOULD_COMPLETE)
  );

takeUntilは引数として受け取ったObservableのイベントが発行されたら、レシーバーのObservableのイベントの発行をストップさせるオペレーターです。
takeFirstは引数のFunc1で書かれた条件を満たしたイベントを一度だけ発行するオペレーターです。
この二つのオペレータを組み合わせると、対となるライフサイクルイベント(onCreateで呼ばれたらonDestory)が呼ばれたら、元Observable(composeのレシーバー)からのイベント発火を止めることが実現できます。

まとめ

composeオペレーターについてご理解頂けたでしょうか。 RxJavaをAndroid開発をしているとRxLifecycleも使いたくなりますが、内部実装を把握することでより安心して利用できますよね。
全体的に雑なエントリになってしまって申し訳ないのですが、時間を見つけて修正したいと思います :bow: