TLDR
-
compose
はメソッドチェーンの中でObseravble
にアクセスできる最高のオペレーター -
lift
との違いはObseravble
の中の値にアクセスできるかどうか -
flatmap
との違いはObseravble
の中の値にアクセスできるかどうか + 実行されるタイミング - RxLifecycleはtakeUntilでライフサイクルイベントをbindしてる。
RxJava Advent Calendar 5日目の今日は、compose
オペレーターについて紹介します。
compose
といえば、最近ではtrello/RxLifecycleでも使われているので、馴染みのある方も多いかもしれませんね。
compose
は初日のエントリで紹介したlift
と同じくRxJavaのメソッドチェーンを実現している要素の一つです。ではこの二つのオペレータにはどのような違いがあるのでしょうか?
liftとcomposeの違い
liftとcomposeの使い分けはjavadocの中に指針が書かれています。以下該当箇所を抜粋です。
If the operator you are creating is designed to act on the individual items emitted by a source Observable, use Observable.lift(rx.Observable.Operator<? extends R, ? super T>). If your operator is designed to transform the source Observable as a whole (for instance, by applying a particular set of existing RxJava operators to it) use compose.
雑に訳すと、
あなたが作ったオペレーターが、Obeservableから放出される個々の要素に作用するのであれば、"lift"を使うべきです。一方で、
あなたが作ったオペレーターが、Observableそのものに作用する場合には"compose"を使うべきでしょう。
といった感じでしょうか。
lift
とcompose
は似ているようで、実際には出来ることが異なります。表にまとめると次のようになります。
オペレーター | 直接Observableを触れるか | Observableの中の値にアクセスできるか |
---|---|---|
compose | 出来る | 出来ない |
lift | 出来ない | 出来る |
compose
は直接値に触れることが出来ないので、map
のような処理は行えません。
一方でcompose
はObservableに直接触れるメリットがあります。RxlifeCycle
はこの特性を利用してライフサイクルイベントをObservableに対してbindしているわけですね。
composeの仕組み
では実際にcompose
がどのように動作しているのか確認してみましょう。compose
の実装は次のようになっています。
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
return ((Transformer<T, R>) transformer).call(this);
}
Transformer<? super T, ? extends R>
を引数にとって、Transformerによる変換を自分自身に適応しているようですね。
では次に引数に登場するTransformer
の実装を見てみましょう。
public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
// cover for generics insanity
}
Transformerは、Obseravble<T>
を受け取りObservable<R>
を返すFunc1
のようですね。 こちらもとてもシンプルでした。
つまりTransformer
とは 「Observableを受け取り、加工して、返す関数」 なので、compose
とは 「Observableが自分自身に関数を適応するためのオペレーター」 であることがわかりました。 lift
の実装よりとてもシンプルでわかりやすい印象ですね。
composeとflatmapの違いについて
RxJavaの処理でよく使うものでflatmap
があります。これもObservable
を返すオペレーターですが、どのような違いがあるんでしょうか。
オペレーター | 呼ばれるタイミング | 返り値 | 直接Observableを触れるか | Observableの中の値にアクセスできるか |
---|---|---|---|---|
compose | subscribeされたタイミング(一度だけ) | Observable | 出来る | 出来ない |
flatmap | onNextが呼ばれるごと(複数回) | Observable | 出来ない | 出来る |
compose
とflatmap
もとても似ていますが、これらも用途は全然違います。
flatmap
はObservable
の中の値にアクセスできるので、その値を元に新しいObservableを作りリターンします。注目すべきポイントは、引数と返り値のObservable
のインスタンスが異なる点です。これはflatmap
は元のObservable
自体にはアクセス出来ない事が理由です。
一方で、compose
はObservable
の中の値にはアクセスできませんが、元のObservable
にはアクセスできるので、元のObservable
を加工して返します。
つまりcompose
の場合は、一般的には引数と返り値のObservable
のインスタンス同じになります。[^1]
[^1] Transformerは別インスタンスのObservableを返すことも可能ですが、想定された使い方ではありません。
RxLifeCycleについて
では冒頭で紹介したtrello/RxLifecycleはどのような仕組みで動いているのか実装を追ってみましょう。
Rxlifecycleのサンプルコードは次のようです。
public class MyActivity extends RxActivity {
@Override
public void onResume() {
super.onResume();
myObservable
.compose(bindToLifecycle())
.subscribe();
}
}
compose
が使われていますね。 引数に渡させれているbindToLifeCycle
とはなんでしょうか。実装を追ってみます。
@Override
public final <T> Observable.Transformer<? super T, ? extends T> bindToLifecycle() {
return RxLifecycle.bindActivity(lifecycleSubject);
}
bindToLifecycle
とは、RxLifecycle.bindActivity(lifecycleSubject)
を返しているだけで、ここにはロジックはないようです。
どんどん実装を深掘りしていきます。RxLifecycle.bindActivity
は次のような実装です。
public static <T> Observable.Transformer<? super T, ? extends T> bindActivity(Observable<ActivityEvent> lifecycle) {
return bind(lifecycle, ACTIVITY_LIFECYCLE);
}
これもbind
をリターンしているだけですね。ではbind
の実装を確認しましょう。
private static <T, R> Observable.Transformer<? super T, ? extends T> bind(Observable<R> lifecycle,
final Func1<R, R> correspondingEvents) {
final Observable<R> sharedLifecycle = lifecycle.share();
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> source) {
return source.takeUntil(
Observable.combineLatest(
sharedLifecycle.take(1).map(correspondingEvents),
sharedLifecycle.skip(1),
new Func2<R, R, Boolean>() {
@Override
public Boolean call(R bindUntilEvent, R lifecycleEvent) {
return lifecycleEvent == bindUntilEvent;
}
})
.takeFirst(SHOULD_COMPLETE)
);
}
};
}
どうやらこのbind(Observable<R> lifecycle, Func1<R, R> correspondingEvents)
の実装がコアの部分のようですね。順番に処理を追っていきましょう。(一部処理を簡略化してあります)
まず、final Observable<R> sharedLifecycle = lifecycle.share();
の部分ですね。ここではHotObservableに変換しています。これはライフサイクルイベントの検出漏れがないようにするためなのですが、詳しい話は省略します(僕もよくわかってない) ご興味があれば素晴らしい記事があるのでそちらをご覧ください。
return source.takeUntil(
Observable.combineLatest(
sharedLifecycle.take(1).map(correspondingEvents),
sharedLifecycle.skip(1),
new Func2<R, R, Boolean>() {
@Override
public Boolean call(R bindUntilEvent, R lifecycleEvent) {
return lifecycleEvent == bindUntilEvent;
}
})
.takeFirst(SHOULD_COMPLETE)
);
次にこの本体部分について紹介します。combineLatest
の中身から見ていきましょう。(combineLatest
とは、イベントを待ち合わせて、関数を適応するオペレーターです)
Observable.combineLatest(
sharedLifecycle.take(1).map(correspondingEvents),
sharedLifecycle.skip(1),
new Func2<R, R, Boolean>() {
@Override
public Boolean call(R bindUntilEvent, R lifecycleEvent) {
return lifecycleEvent == bindUntilEvent;
}
});
sharedLifecycle
というのはライフサイクルイベントですね。sharedLifecycle.take(1).map(correspondingEvents)
では、Observableによる処理が実行されたイベントと対応関係にあたるイベントを取得しています。onCreate
で呼ばれたならばonDestory
となります。 sharedLifecycle.skip(1)
はシンプルで2つ目移行のライフサイクルイベントを順番に取得していきます。Func2
の実装では、これら二つのイベントが一致するか確かめています。
return source.takeUntil(
Observable.combineLatest(
Observable[対応するライフサイクルイベントが呼ばれるとイベントを発行するObservable]
.takeFirst(SHOULD_COMPLETE)
);
takeUntil
は引数として受け取ったObservable
のイベントが発行されたら、レシーバーのObservableのイベントの発行をストップさせるオペレーターです。
takeFirst
は引数のFunc1で書かれた条件を満たしたイベントを一度だけ発行するオペレーターです。
この二つのオペレータを組み合わせると、対となるライフサイクルイベント(onCreate
で呼ばれたらonDestory
)が呼ばれたら、元Observable(composeのレシーバー)からのイベント発火を止めることが実現できます。
まとめ
compose
オペレーターについてご理解頂けたでしょうか。 RxJavaをAndroid開発をしているとRxLifecycleも使いたくなりますが、内部実装を把握することでより安心して利用できますよね。
全体的に雑なエントリになってしまって申し訳ないのですが、時間を見つけて修正したいと思います