LoginSignup
0
3

More than 5 years have passed since last update.

RxJava 勉強会 Week4

Last updated at Posted at 2017-03-29
1 / 42

RxJava2 の実装について


内容

簡単なサンプルコードを使って

  • Observable.create()
  • Observable.subscribe()

の実装を説明をします。

元ネタ:http://qiita.com/Shiozawa/items/0ccdb43d803de459c6f4


サンプルコード


URL


サンプルコード

// Observable の作成
Observable<Integer> observable =
    Observable.create(e -> {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    });

// Observable のアイテムをコンソールに出力
observable.subscribe(
    System.out::println,
    System.out::println,
    System.out::println,
    System.out::println
);

サンプルコードを展開

Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override 
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
       }
    });

サンプルコードを展開

observable.subscribe(new Observer<Integer>() {
    @Override 
    public void onSubscribe(Disposable d) {
        System.out.println(d);
    }
    @Override 
    public void onNext(Integer integer) {
        System.out.println(integer);
    }
    @Override 
    public void onError(Throwable e) {
        System.out.println(e);
    }
    @Override 
    public void onComplete() {
       // 何も出力しない
       System.out.println();
    }
});

登場人物


Observable の生成

  • Observable<T> クラス
  • Observable.create() スタティックメソッド
  • ObservableOnSubscribe<T> インタフェース
  • ObservableEmitter<T> インタフェース

Observer の生成と subscribe

  • Observer<T> インタフェース
  • Observable.subscribe() インスタンスメソッド

Observable クラス


2つの役割

  • Observable インスタンスを生成するための static ファクトリーメソッド
  • Observable のインスタンスメソッド

static ファクトリーメソッド


インスタンスメソッド

  • subscribe() やメソッドチェーン
  • それぞれのインスタンスメソッドごとに、Observable を継承したクラスがあり、それを返している
    • map() は ObservableMap 返す

Observable.create()

  • Observable の static ファクトリーメソッドの一つ

Observable.create() ソース

Observable.create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
  • RxJavaPlugin.onAssembly は通常何もしない
    • テストの実行時のインジェクトなどに利用
  • ObservableCreate を生成して返す

ObservableCreate クラス


特徴

  • Observable を継承している
  • Observable の abstract メソッドである subscribeActual() を実装している
  • source プロパティとして ObservableOnSubscribe への参照を持っている

UML


ここまでのまとめ


サンプルコード

Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override 
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
       }
    });

やってること

  • Observable.create()
    • Observable.create() は static ファクトリーメソッド
    • Observable を継承した ObservableCreate インスタンスを生成
  • ObservableCreate クラス
    • Observable を継承
    • 渡された ObservableOnSubscribe インスタンスへの参照を保持

Observable.subscribe()


サンプルコード

observable.subscribe(new Observer<Integer>() {
    @Override 
    public void onSubscribe(Disposable d) {
        System.out.println(d);
    }
    @Override 
    public void onNext(Integer integer) {
        System.out.println(integer);
    }
    @Override 
    public void onError(Throwable e) {
        System.out.println(e);
    }
    @Override 
    public void onComplete() {
       // 何も出力しない
       System.out.println();
    }
});

Observable.subscribe() コード

Observable.subscribe()
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

subscribeActual()observer を渡しているだけ


Observable.subscribeActual() ソース

Observable.subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);

⇒ abstract メソッド


ObservableCreate.subscribeActual() ソース

ObservableCreate.subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
       // source は ObservableCreate のプロパティ
       // (final ObservableOnSubscribe<T> source;)
       source.subscribe(parent);
    } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
    }
}

ObservableCreate.subscribeActula() やっていること

  • CreateEmitter<T> インスタンスを作成
    • observer を渡す
  • Observer インスタンスの onSubscribe() を呼び出す
  • source プロパティの subscribe()CreateEmitter<T> を与えて呼び出す

ObservableCreate.CreateEmitter クラス

  • ObservableEmitter<T> を実装している
    • ObservableEmitter<T>Emitter<T> を継承している
  • observer プロパティとして Observer への参照を保持している

UML


(再掲)ObservableCreate.subscribeActual() ソース

ObservableCreate.subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
       // source は ObservableCreate のプロパティ
       // (final ObservableOnSubscribe<T> source;)
       source.subscribe(parent);
    } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
    }
}

source.subscribe()

ObservableOnSubscribeの無名実装
new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
}

CreateEmitter.onNext()

CreateEmitter.onNext()
@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        // observer は CreateEmitter のプロパティ
        // (final Observer<? super T> observer;)
        observer.onNext(t);
    }
}

⇒ onNext をただ呼び出しているだけ


ここまでのまとめ

  • Observer.subscribe()
    • subscribeActual() を呼び出す
    • subscribeActual() は abstract メソッド
    • 今回のケースでは、実装しているのは ObservableCreateObservableCreate.subscribeActual() が呼び出される
  • ObservableCreate.subscribeActual()
    • CreateEmitter<T> を生成
    • CreateEmitter<T> は渡された Observer をラップ
    • CreateEmitter<T>.onNext() が呼び出されると、Observer.onNext() がそのまま呼び出される

わかったこと1

Observable クラスには二つの役割がある。

  • Observable インスタンスを生成するための static ファクトリーメソッドを提供
  • Observable のインスタンスメソッドを提供

わかったこと2

Observable のメソッドは、基本的に Observable の派生クラスを生成して返す。

  • 例1:Observable.create()ObservableCreate を生成
  • 例2:Observable.map()ObservableMap を生成

わかったこと3

Observable.subscribe()Observable.subscribeActual() を呼び出している。

  • subscribeActual() は abstract メソッド
  • Observable を継承したクラスが性質に応じて subscribeActual() を実装

わかったこと4

ObservableCreate.subscribeActual() は次の処理を行っている。

  • Observer をラップするの CreateEmitter<T> を生成
  • ObservableOnSubscribe.subscribe() にその CreateEmitter<T> インスタンスを渡す
  • CreateEmitter<T>onNext()ObserveronNext() に転送

わかったこと5

このケースでは subscribe() をきっかけとして Observable が動き出す

  • ⇒ Cold な Observable である

次回


次回

  • map() などを使ったメソッドチェーンの実装について
  • Scheduler 指定の実装について(tranpoline の効果が分かるかも?)
  • Emitter を動的(?)にする方法(毎回 Observable を作らない)
0
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
3