Edited at
RxJavaDay 11

Rxで知っておくと便利なSubjectたち

More than 1 year has passed since last update.

この記事はRxJava Advent Calendar 2015 11日目の記事です。

RxJava・RxAndroidの紹介記事の中にはよく「面倒くさいASyncTaskを使わなくて済む」といったRxJavaを非同期処理の代わりとして紹介されています。

もちろん、非同期処理を置き換えられるという点はRxJavaの有効的な使い方の一つですが、RxJavaには他にも便利に使える機能があります。

今回はそれらの機能の一つであるSubjectについて紹介したいと思います。


Subjectとは

短く言うと「SubscriberObservableの2つの機能を併せ持ったもの」です。

SubscriberにあるようなonNextonErroronCompleteといったメソッドを呼び出せ、Observableのようにsubscribeメソッドを呼び出すことができます。


Subject1.java

Subject<String, String> subject = BehavorSubject.create();

subject.onNext("Hoge");
subject.subscribe(System.out::println); // "Hoge"が標準出力へ出力される

Observable.create()を使ったObservableの生成方法だと、任意のタイミングでonNextなどを呼び出すことが非常に面倒になってしまいますが、SubjectではSubject自身に対してonNextなどを呼ぶことができるため、任意のタイミングでの呼び出しが簡単にできます。


Subjectの種類と特徴

Subjectには、onNextで渡された値をどのようにするかで複数種類の物があります。

それぞれを使い分けることで、今まで複雑になりがちだった記述を短くきれいに書くことができるようになります。


AsyncSubject

AsyncSubjectでは、AsyncSubject側のonCompletedが呼ばれた直後に、最後にonNextで渡された値のみがSubscriberのonNextの引数として渡されます。

途中でAsyncSubject側のonErrorが呼ばれた場合は、SubscriberのonNextは呼ばれずにonErrorが呼ばれます。


使いどころ

onNextが呼ばれるのが最後の要素だけで、onCompletedを呼ばないと要素が流れてこないというのが少し独特ですが、値が一つしか流れてこないor最後の一つしか必要でない時に、コールバックを受け取る実装をするのに便利です。

例として以下のシチュエーションをAsyncSubjectで実装してみたいと思います。(実装は基本的にRxAndroidを使用しています。)


画面が表示されるより前にデータ取得を開始する。

画面が表示された時にデータ取得が終わっていればそのデータを表示する。

終わっていなければロード中のインジケーターを表示して、データ取得が完了し次第取得したデータを表示する。



Initializer.java

// Applicationクラス(HogeApplicationとする)のonCreateでインスタンス化しておく。

public class Initializer {
public AsyncSubject<Data> asyncSubject = AsyncSubject.create();

public Initializer(){
generateObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
asyncSubject.onNext(data);
asyncSubject.onCompleted();
});
}

public Observable<Data> generateObservable(){
return Observable.create(subscriber -> subscriber.onNext(longTask()));
}

public Data longTask() {
//時間のかかる処理 (ネットワークからのデータ取得など)
}
}



SubActivity.java

// MainActivityでボタンをタップしたりした後に呼ばれると仮定

public class SubActivity extends RxActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_sub);

// ここでダイアログを表示させる
AlertDialog dialog = ......;
dialog.show();

// HogeApplication#getInitializer() が上のInitializerのGetter
((HogeApplication) getApplication()).getInitializer().asyncSubject
.compose(bindToLifecycle()) // AndroidのライフサイクルにBind
.subscribe(data -> {
// dataをリストに表示したりする
dialog.dismiss();
}, throwable -> {
// エラーが出たことを表示する
dialog.dismiss();
});
}
}


画面を表示するより前にデータが来てしまった時と、データが来なかった時両方に一つのコードで対応することができるようになりました。


BehaviorSubject

直前にonNextで渡された値を保持し、subscribe()した直後に保持していた値を流します。その後の動作は後述のPublishSubjectと同等です。

一度onErrorで例外が流されると、onNextは無視され、subscribe()後にSubscriber側のonErrorが呼ばれます。


使いどころ

直前にonNextで渡したデータがやってくるという点で、使いどころは少し限定されていますが、RxLifecycleで、ActivityやFragmentのライフサイクルを取得・判別する部分で使われているのが一番しっくり来た使い方でした。

RxLifecycleの紹介


RxLifecycle.java

return new Observable.Transformer<T, T>() {

@Override
public Observable<T> call(Observable<T> source) {
return source.takeUntil(
Observable.combineLatest(
lifecycle.take(1).map(correspondingEvents),
lifecycle.skip(1),
new Func2<R, R, Boolean>() {
@Override
public Boolean call(R bindUntilEvent, R lifecycleEvent) {
return lifecycleEvent == bindUntilEvent;
}
})
.onErrorReturn(RESUME_FUNCTION)
.takeFirst(SHOULD_COMPLETE)
);
}
};

上のコードは、RxLifecycleのRxLifecycle.javaより一部抜粋・修正を加えたものです

sourceはライフサイクルを適用させるObservable。

correspondingEventsは、ActivityやFragmentのイベントに対して、対のイベントに変換を行うFunction(例:OnCreateならばOnDestroyに変換)。

lifecycleは、対象のAcitvityのライフサイクルの状態が流れてくるBehaviorSubjectです。

コードを見ると、lifecycle.take(1).map(correspondingEvents)でバインドしたライフサイクルに対する終わりのイベントを取得し、lifecycle.skip(1)combineLatestでつなげることで、lifecycleに新しい値が流れてくるごとに評価が行われます。

この評価で、終わりのイベントと次のイベントが同じ(終わりのライフサイクルに到達した)時にtakeUntilで元のObservableを途中で止めています。

この、ライフサイクルイベントのストリームとしてBehaviorSubjectが使われています。前の状態をキャッシュしてくれているので、subscribe時に直前に起きていたイベントを取得できるようになるわけです。

(onCreateからonStartの間にbindToLifecycleした場合は、lifecycleにonCreateのイベントがキャッシュされている)


PublishSubject



Subjectの中で、一番動作の理解がしやすいSubjectです。SubjectへのonNext,onError, onSuccessedすべての呼び出しが、Subscriber側の同じメソッドにそのまま伝搬します。言ってしまえば、普通のリスナーと同じような挙動をします。


使いどころ

上で言ってしまったように、リスナー系の仕組みをRxを使用して置き換えるようなときに使用することができます。

例として、ArrayAdapterなどでリストを表示するときに、クリックイベントを元のActivityやFragmentへ返す動作を実装してみます。


HogeAdapter.java

public PublishSubject<Integer> clickObservable = PublishSubject.create();

@Override
public View getView(int position, View view, ViewGroup parent) {
ViewHolder holder;
if (view == null) {
view = inflater.inflate(R.layout.view_list_item_hoge, null);
holder = new ViewHolder(view);
view.setTag(holder);
} else {
holder = (ViewHolder) view.getTag();
}

Item item = getItem(position);
holder.text.setText(item.getData());
view.setOnClickListener(v -> clickObservable.onNext(history));
return view;
}



HogeActivity.java

public HogeActivity extends RxActivity {

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_hoge);

// ここでダイアログを表示させる
ListView list = (ListView) findViewById(R.id.list_hoge);
HogeAdapter adapter = new HogeAdapter();
adapter.clickObservable
.compose(bindToLifecycle())
.subscribe(position -> {
// position番目の要素がクリックされた時の動作
});
}
}


これで、HogeActivity側でクリックされたポジションのイベントを取得できるようなりました。


ReplaySubject

大きな特徴は、subscribe()した後に、Subjectに今までonNextで流されていた値が全てやってくるという点です。

今までのデータがすべて流れてきたあとの動作はPublishSubjectと同等です。


使いどころ

元データからの差分をObservableに流すような機構と相性が合う気がします。

また、skipすれば任意の場所の値を取れるという部分を活用すれば、Listの代わりとして使えそうでもあります。


SerializedSubject

今までのSubjectはすべて、Subject側のon***メソッドを同じスレッドから呼び出す必要がありました。

もし、複数のスレッドからon***メソッドが呼ばれる場合は、このSerializedSubjectでラッピングしてあげる必要があります。

PublishSubject<Data> subject = PublishSubject.create();

SerializedSubject<Data, Data> serializedSubject = new SerializedSubject(subject);


おわりに

Subjectはとてもクセのある種類がありますが、使い分ければそれぞれで便利に使うことができます。

なにかミスなどありましたらコメントにて教えていただければと思います。


ライセンス表記

この記事で使っている画像はすべてReactiveXの著作物であり、ReactiveX - Subject のページから引用したものです。

画像のライセンスはCreative Commons Attribution 3.0の下、表示しています。

RxLifecycleより抜粋したコードはApacheLicense Version 2.0の下で公開されます。

それ以外の私が制作したコードはPublicDomainの下で公開します。