Help us understand the problem. What is going on with this article?

RxJava 勉強会 Week4

More than 3 years have passed since last update.

RxJava 勉強会 Week4

by Shiozawa
1 / 42

RxJava2 の実装について


内容

簡単なサンプルコードを使って

  • Observable.create()
  • Observable.subscribe()

の実装を説明をします。

元ネタ:http://qiita.com/Shiozawa/items/0ccdb43d803de459c6f4


サンプルコード


URL

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week4/Week4.java


サンプルコード

// Observable の作成
Observable<Integer> observable =
    Observable.create(e -> {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    });

// Observable のアイテムをコンソールに出力
observable.subscribe(
    System.out::println,
    System.out::println,
    System.out::println,
    System.out::println
);

サンプルコードを展開

Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override 
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
       }
    });

サンプルコードを展開

observable.subscribe(new Observer<Integer>() {
    @Override 
    public void onSubscribe(Disposable d) {
        System.out.println(d);
    }
    @Override 
    public void onNext(Integer integer) {
        System.out.println(integer);
    }
    @Override 
    public void onError(Throwable e) {
        System.out.println(e);
    }
    @Override 
    public void onComplete() {
       // 何も出力しない
       System.out.println();
    }
});

登場人物


Observable の生成

  • Observable<T> クラス
  • Observable.create() スタティックメソッド
  • ObservableOnSubscribe<T> インタフェース
  • ObservableEmitter<T> インタフェース

Observer の生成と subscribe

  • Observer<T> インタフェース
  • Observable.subscribe() インスタンスメソッド

Observable クラス


2つの役割

  • Observable インスタンスを生成するための static ファクトリーメソッド
  • Observable のインスタンスメソッド

static ファクトリーメソッド


インスタンスメソッド

  • subscribe() やメソッドチェーン
  • それぞれのインスタンスメソッドごとに、Observable を継承したクラスがあり、それを返している
    • map() は ObservableMap 返す

Observable.create()

  • Observable の static ファクトリーメソッドの一つ

Observable.create() ソース

Observable.create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
  • RxJavaPlugin.onAssembly は通常何もしない
    • テストの実行時のインジェクトなどに利用
  • ObservableCreate を生成して返す

ObservableCreate クラス


特徴

  • Observable を継承している
  • Observable の abstract メソッドである subscribeActual() を実装している
  • source プロパティとして ObservableOnSubscribe への参照を持っている

UML


ここまでのまとめ


サンプルコード

Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override 
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
       }
    });

やってること

  • Observable.create()
    • Observable.create() は static ファクトリーメソッド
    • Observable を継承した ObservableCreate インスタンスを生成
  • ObservableCreate クラス
    • Observable を継承
    • 渡された ObservableOnSubscribe インスタンスへの参照を保持

Observable.subscribe()


サンプルコード

observable.subscribe(new Observer<Integer>() {
    @Override 
    public void onSubscribe(Disposable d) {
        System.out.println(d);
    }
    @Override 
    public void onNext(Integer integer) {
        System.out.println(integer);
    }
    @Override 
    public void onError(Throwable e) {
        System.out.println(e);
    }
    @Override 
    public void onComplete() {
       // 何も出力しない
       System.out.println();
    }
});

Observable.subscribe() コード

Observable.subscribe()
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

subscribeActual()observer を渡しているだけ


Observable.subscribeActual() ソース

Observable.subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);

⇒ abstract メソッド


ObservableCreate.subscribeActual() ソース

ObservableCreate.subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
       // source は ObservableCreate のプロパティ
       // (final ObservableOnSubscribe<T> source;)
       source.subscribe(parent);
    } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
    }
}

ObservableCreate.subscribeActula() やっていること

  • CreateEmitter<T> インスタンスを作成
    • observer を渡す
  • Observer インスタンスの onSubscribe() を呼び出す
  • source プロパティの subscribe()CreateEmitter<T> を与えて呼び出す

ObservableCreate.CreateEmitter クラス

  • ObservableEmitter<T> を実装している
    • ObservableEmitter<T>Emitter<T> を継承している
  • observer プロパティとして Observer への参照を保持している

UML


(再掲)ObservableCreate.subscribeActual() ソース

ObservableCreate.subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
       // source は ObservableCreate のプロパティ
       // (final ObservableOnSubscribe<T> source;)
       source.subscribe(parent);
    } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
    }
}

source.subscribe()

ObservableOnSubscribeの無名実装
new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
}

CreateEmitter.onNext()

CreateEmitter.onNext()
@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        // observer は CreateEmitter のプロパティ
        // (final Observer<? super T> observer;)
        observer.onNext(t);
    }
}

⇒ onNext をただ呼び出しているだけ


ここまでのまとめ

  • Observer.subscribe()
    • subscribeActual() を呼び出す
    • subscribeActual() は abstract メソッド
    • 今回のケースでは、実装しているのは ObservableCreateObservableCreate.subscribeActual() が呼び出される
  • ObservableCreate.subscribeActual()
    • CreateEmitter<T> を生成
    • CreateEmitter<T> は渡された Observer をラップ
    • CreateEmitter<T>.onNext() が呼び出されると、Observer.onNext() がそのまま呼び出される

わかったこと1

Observable クラスには二つの役割がある。

  • Observable インスタンスを生成するための static ファクトリーメソッドを提供
  • Observable のインスタンスメソッドを提供

わかったこと2

Observable のメソッドは、基本的に Observable の派生クラスを生成して返す。

  • 例1:Observable.create()ObservableCreate を生成
  • 例2:Observable.map()ObservableMap を生成

わかったこと3

Observable.subscribe()Observable.subscribeActual() を呼び出している。

  • subscribeActual() は abstract メソッド
  • Observable を継承したクラスが性質に応じて subscribeActual() を実装

わかったこと4

ObservableCreate.subscribeActual() は次の処理を行っている。

  • Observer をラップするの CreateEmitter<T> を生成
  • ObservableOnSubscribe.subscribe() にその CreateEmitter<T> インスタンスを渡す
  • CreateEmitter<T>onNext()ObserveronNext() に転送

わかったこと5

このケースでは subscribe() をきっかけとして Observable が動き出す

  • ⇒ Cold な Observable である

次回


次回

  • map() などを使ったメソッドチェーンの実装について
  • Scheduler 指定の実装について(tranpoline の効果が分かるかも?)
  • Emitter を動的(?)にする方法(毎回 Observable を作らない)
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした