この記事はRxJava Advent Calendar 2015 11日目の記事です。
RxJava・RxAndroidの紹介記事の中にはよく「面倒くさいASyncTaskを使わなくて済む」といったRxJavaを非同期処理の代わりとして紹介されています。
もちろん、非同期処理を置き換えられるという点はRxJavaの有効的な使い方の一つですが、RxJavaには他にも便利に使える機能があります。
今回はそれらの機能の一つであるSubjectについて紹介したいと思います。
Subjectとは
短く言うと「Subscriber
とObservable
の2つの機能を併せ持ったもの」です。
Subscriber
にあるようなonNext
やonError
、onComplete
といったメソッドを呼び出せ、Observable
のようにsubscribe
メソッドを呼び出すことができます。
Subject<String, String> subject = BehavorSubject.create();
subject.onNext("Hoge");
subject.subscribe(System.out::println); // "Hoge"が標準出力へ出力される
Observable.create()
を使ったObservableの生成方法だと、任意のタイミングでonNext
などを呼び出すことが非常に面倒になってしまいますが、SubjectではSubject自身に対してonNext
などを呼ぶことができるため、任意のタイミングでの呼び出しが簡単にできます。
Subjectの種類と特徴
Subjectには、onNext
で渡された値をどのようにするかで複数種類の物があります。
それぞれを使い分けることで、今まで複雑になりがちだった記述を短くきれいに書くことができるようになります。
AsyncSubject
AsyncSubjectでは、AsyncSubject側のonCompleted
が呼ばれた直後に、最後にonNext
で渡された値のみがSubscriberのonNext
の引数として渡されます。
途中でAsyncSubject側のonError
が呼ばれた場合は、SubscriberのonNext
は呼ばれずにonError
が呼ばれます。
使いどころ
onNextが呼ばれるのが最後の要素だけで、onCompletedを呼ばないと要素が流れてこないというのが少し独特ですが、値が一つしか流れてこないor最後の一つしか必要でない時に、コールバックを受け取る実装をするのに便利です。
例として以下のシチュエーションをAsyncSubjectで実装してみたいと思います。(実装は基本的にRxAndroidを使用しています。)
画面が表示されるより前にデータ取得を開始する。
画面が表示された時にデータ取得が終わっていればそのデータを表示する。
終わっていなければロード中のインジケーターを表示して、データ取得が完了し次第取得したデータを表示する。
// Applicationクラス(HogeApplicationとする)のonCreateでインスタンス化しておく。
public class Initializer {
public AsyncSubject<Data> asyncSubject = AsyncSubject.create();
public Initializer(){
generateObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
asyncSubject.onNext(data);
asyncSubject.onCompleted();
});
}
public Observable<Data> generateObservable(){
return Observable.create(subscriber -> subscriber.onNext(longTask()));
}
public Data longTask() {
//時間のかかる処理 (ネットワークからのデータ取得など)
}
}
// MainActivityでボタンをタップしたりした後に呼ばれると仮定
public class SubActivity extends RxActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_sub);
// ここでダイアログを表示させる
AlertDialog dialog = ......;
dialog.show();
// HogeApplication#getInitializer() が上のInitializerのGetter
((HogeApplication) getApplication()).getInitializer().asyncSubject
.compose(bindToLifecycle()) // AndroidのライフサイクルにBind
.subscribe(data -> {
// dataをリストに表示したりする
dialog.dismiss();
}, throwable -> {
// エラーが出たことを表示する
dialog.dismiss();
});
}
}
画面を表示するより前にデータが来てしまった時と、データが来なかった時両方に一つのコードで対応することができるようになりました。
BehaviorSubject
直前にonNext
で渡された値を保持し、subscribe()
した直後に保持していた値を流します。その後の動作は後述のPublishSubject
と同等です。
一度onError
で例外が流されると、onNext
は無視され、subscribe()
後にSubscriber側のonError
が呼ばれます。
使いどころ
直前にonNext
で渡したデータがやってくるという点で、使いどころは少し限定されていますが、RxLifecycleで、ActivityやFragmentのライフサイクルを取得・判別する部分で使われているのが一番しっくり来た使い方でした。
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> source) {
return source.takeUntil(
Observable.combineLatest(
lifecycle.take(1).map(correspondingEvents),
lifecycle.skip(1),
new Func2<R, R, Boolean>() {
@Override
public Boolean call(R bindUntilEvent, R lifecycleEvent) {
return lifecycleEvent == bindUntilEvent;
}
})
.onErrorReturn(RESUME_FUNCTION)
.takeFirst(SHOULD_COMPLETE)
);
}
};
上のコードは、RxLifecycleのRxLifecycle.javaより一部抜粋・修正を加えたものです
source
はライフサイクルを適用させるObservable。
correspondingEvents
は、ActivityやFragmentのイベントに対して、対のイベントに変換を行うFunction(例:OnCreate
ならばOnDestroy
に変換)。
lifecycle
は、対象のAcitvityのライフサイクルの状態が流れてくるBehaviorSubjectです。
コードを見ると、lifecycle.take(1).map(correspondingEvents)
でバインドしたライフサイクルに対する終わりのイベントを取得し、lifecycle.skip(1)
とcombineLatest
でつなげることで、lifecycle
に新しい値が流れてくるごとに評価が行われます。
この評価で、終わりのイベントと次のイベントが同じ(終わりのライフサイクルに到達した)時にtakeUntil
で元のObservableを途中で止めています。
この、ライフサイクルイベントのストリームとしてBehaviorSubjectが使われています。前の状態をキャッシュしてくれているので、subscribe時に直前に起きていたイベントを取得できるようになるわけです。
(onCreateからonStartの間にbindToLifecycle
した場合は、lifecycle
にonCreateのイベントがキャッシュされている)
PublishSubject
Subjectの中で、一番動作の理解がしやすいSubjectです。SubjectへのonNext
,onError
, onSuccessed
すべての呼び出しが、Subscriber側の同じメソッドにそのまま伝搬します。言ってしまえば、普通のリスナーと同じような挙動をします。
使いどころ
上で言ってしまったように、リスナー系の仕組みをRxを使用して置き換えるようなときに使用することができます。
例として、ArrayAdapterなどでリストを表示するときに、クリックイベントを元のActivityやFragmentへ返す動作を実装してみます。
public PublishSubject<Integer> clickObservable = PublishSubject.create();
@Override
public View getView(int position, View view, ViewGroup parent) {
ViewHolder holder;
if (view == null) {
view = inflater.inflate(R.layout.view_list_item_hoge, null);
holder = new ViewHolder(view);
view.setTag(holder);
} else {
holder = (ViewHolder) view.getTag();
}
Item item = getItem(position);
holder.text.setText(item.getData());
view.setOnClickListener(v -> clickObservable.onNext(history));
return view;
}
public HogeActivity extends RxActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_hoge);
// ここでダイアログを表示させる
ListView list = (ListView) findViewById(R.id.list_hoge);
HogeAdapter adapter = new HogeAdapter();
adapter.clickObservable
.compose(bindToLifecycle())
.subscribe(position -> {
// position番目の要素がクリックされた時の動作
});
}
}
これで、HogeActivity側でクリックされたポジションのイベントを取得できるようなりました。
ReplaySubject
大きな特徴は、subscribe()
した後に、Subjectに今までonNext
で流されていた値が全てやってくるという点です。
今までのデータがすべて流れてきたあとの動作はPublishSubjectと同等です。
使いどころ
元データからの差分をObservableに流すような機構と相性が合う気がします。
また、skip
すれば任意の場所の値を取れるという部分を活用すれば、Listの代わりとして使えそうでもあります。
SerializedSubject
今までのSubjectはすべて、Subject側のon***
メソッドを同じスレッドから呼び出す必要がありました。
もし、複数のスレッドからon***
メソッドが呼ばれる場合は、このSerializedSubjectでラッピングしてあげる必要があります。
PublishSubject<Data> subject = PublishSubject.create();
SerializedSubject<Data, Data> serializedSubject = new SerializedSubject(subject);
おわりに
Subjectはとてもクセのある種類がありますが、使い分ければそれぞれで便利に使うことができます。
なにかミスなどありましたらコメントにて教えていただければと思います。
ライセンス表記
この記事で使っている画像はすべてReactiveXの著作物であり、ReactiveX - Subject のページから引用したものです。
画像のライセンスはCreative Commons Attribution 3.0の下、表示しています。
RxLifecycleより抜粋したコードはApacheLicense Version 2.0の下で公開されます。
それ以外の私が制作したコードはPublicDomainの下で公開します。