RxJava

RxJava 勉強会 Week2

More than 1 year has passed since last update.

目次

  • 前回の復習
  • Observable
  • Operators
  • Scheduler
  • Tumpaca x RxJava

前回の復習


ReactiveX

  • Observable = 非同期プログラム × 複数のデータ
Integer[] numbers = {1, 2, 3, 4, 5};
Observable<Integer> observable
    = Observable.fromArray(numbers);
observable.subscribe(new Consumer<Integer>() {
    @Override 
    public void accept(Integer integer) throws Exception {
        System.out.println(integer);
    }
});

Observable


Observable, Observe, Subscribe

  • Observable: Item を発行(emit)する実体
  • Observer: Observable が発行した Item を受け取る実体
  • Subscribe: Observer が Observable を監視すること
    • 『Observe は Observable を Subscribe する』

Observable Contract

  • Observable と Observer の契約
  • ReactiveX ではこれらを Notification と呼ぶ

ref: http://reactivex.io/documentation/contract.html


Observable から Observer への Notification

  • OnNext(Item): Observable が Item を emit したときの通知
  • OnCompleted(): Observable が成功して、これ以上 Item を emit しない時の通知
  • OnError(Error): Observable がエラーで中断し、これ以上 Item を emit しない時の通知
  • OnSubscribe()(任意): Observable が Observer からの Request 通知(後述)を受け取る準備ができたときの通知
    • Backpressure と呼ばれる機能と関係する

RxJava: Observer インタフェース

  • interface Observer<T>
    • void onNext(T t);
    • void onError(Throwable e);
    • void onComplete();
    • void onSubscribe(Disposable d);

Observer から Observable への Notification

  • Subscribe: Observer 側が通知を受け取る準備ができたことを Observable に指示する
  • Unsubscribe: Observer 側が通知を受け取る必要がなくなったことを Observable に指示する
  • Request(任意): 一定数の OnNext 通知以上は必要ないことを Observable に通知する
    • Backpressure と呼ばれる機能と関係がある

RxJava

  • Subscribe
    • Observable.subscribe(Observer observer);
  • Unsubscribe
    • RxJava 1.x: Subscription.unsubscribe();
    • RxJava 2.x: Disposable.dispose();
  • Request
    • RxJava 1.x: Subscriber.request(int n);
    • RxJava 2.x: Subscription.request(int n);

イメージ図

スクリーンショット 2017-02-03 9.46.15.png


Sample Codes

https://github.com/benkyokai/RxJavaStudy/tree/master/Week1/src/main/java/com/hjm/week2/Week2.java


Sample1 (1)

Observable の作成

Integer[] numbers = {1, 2, 3, 4, 5};
Observable<Integer> observable = 
    Observable.fromArray(numbers);

Sample1 (2)

Observer の作成

Observer<Integer> observer = new Observer<Integer>() {
    @Override public void onSubscribe(Disposable d) {
        System.out.println("onSubscribe(): " + d);
    }
    @Override public void onNext(Integer integer) {
        System.out.println("onNext(): " + integer);
    }
    @Override public void onError(Throwable e) {
        System.out.println("onError(): " + e);
    }
    @Override public void onComplete() {
        System.out.println("onComplete()");
    }
};

Sample1 (3)

Observable を subscribe

observable.subscribe(observer);

Sample1 (4)

実行結果

onSubscribe(): io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable@619a5dff
onNext(): 1
onNext(): 2
onNext(): 3
onNext(): 4
onNext(): 5
onComplete()

RxJava: Consumer と Action

  • interface Consumer<T>
    • void accept(T t) throws Exception;
  • interface Action
    • void run() throws Exception;
  • メモ
    • Java8 の Lambda を使うのに便利
    • Action と java.lang.Runnable との違い:Exception を throws 可

Sample2

onNext, onError, onComplete, onSubscribe を個別に作って subscribe

Consumer<Integer> onNext = new Consumer<Integer>() {
    @Override public void accept(Integer integer) throws Exception {
        System.out.println("onNext(): " + integer);
    }
};
Consumer<Throwable> onError = new Consumer<Throwable>() {
    @Override public void accept(Throwable e) throws Exception {
        System.out.println("onError(): " + e);
    }
};
Action onComplete = new Action() {
    @Override public void run() throws Exception {
        System.out.println("onComplete()");
    }
};
Consumer<Disposable> onSubscribe = new Consumer<Disposable>() {
    @Override public void accept(Disposable disposable) throws Exception {
        System.out.println("onSubscribe(): " + disposable);
    }
};
observable.subscribe(onNext, onError, onComplete, onSubscribe);

Sample3

ラムダで一気に subscribe
(メソッド参照で更に省略可)

observable.subscribe(
    integer -> {
        System.out.println("onNext(): " + integer);
    },
    e -> {
        System.out.println("onError(): " + e);
    },
    () -> {
        System.out.println("onComplete()");
    },
    disposable ->  {
        System.out.println("onSubscribe(): " + disposable);
    });

Sample4

3回目の OnNext で throw new RuntimeException()

Consumer<Integer> onNext = new Consumer<Integer>() {
    @Override public void accept(Integer integer) throws Exception {
        if (integer != null && integer == 3) {
            throw new RuntimeException("A number is three.");
        }
        System.out.println("onNext(): " + integer);
    }
};
Consumer<Throwable> onError = new Consumer<Throwable>() {
    @Override public void accept(Throwable e) throws Exception {
        System.out.println("onError(): " + e);
    }
};
observable.subscribe(onNext, onError, onComplete);

Sample4

実行結果

onNext(): 1
onNext(): 2
onError(): java.lang.RuntimeException: A number is three.

Notification Contracts

  • Observable は OnNext() 通知を 0 以上 Observer に送る
  • OnError() か OnCompleted() が送られたら、以降、通知は送られない
  • 通知がまったく送られない場合がある
    • この場合は Observable はまだ Active

Observable を作成

  • 1, 2, 3 を通知する Observable を作成

Observable とは

  • Observable = Observer に通知を送る実体
  • どのタイミングで通知を開始?
    • Observer が subscribe した時(Cold Observable)

⇒ Observer が Observable を Subscribe したタイミングで、
OnNext(1), OnNext(2), OnNext(3), OnCompleted() を通知するクラスを作る


Sample5

1, 2, 3 を返す Observable の作成

Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    });

Sample5

実行結果

example5
onSubscribe(): null // なぜ?
onNext(): 1
onNext(): 2
onNext(): 3
onComplete()

Operators


Operator とは

  • Operator = 下図の四角い箱


注意事項

実装によって、

  • 名前が違う
  • 特別な Operator がある
  • 一定の Operator が存在しない

ということがあります。


Operator の種類

  • Observable を作成(Create, From)
  • Observable を変換(Map)
  • Observable をフィルタ(Filter)
  • などなど、、、

必要な時はここから探す
http://reactivex.io/documentation/operators.html


Operator Chain

  • Operator は Chain できる
  • Builder Pattern に似ている
    • しかし、順番が重要という点で Builder Patter と異なる
observable
  .skip(10)
  .take(5) 
  .map(i -> i + "transformed") 

Scheduler


Scheduler

  • 特定の Operator を違うスレッドで実行したい
    • ⇒ 実行したい Scheduler を Observable に指定する
  • 指定方法
    • SubscribeOn()
    • ObserveOn()

SubscribeOn()

  • Observable が動作する Scheduler を指定する
  • Operator Chain のどこで指定してもよい
  • 「ObservableOnSubscribe.subscribe() が実行されるスレッドの指定」と理解するとよい

ObserveOn()

  • Observable から Observer に通知する際の Scheduler を指定する
  • Operator Chain で指定した後ろの Operator に適用される
  • 「Observer のコールバックが実行されるスレッド」と理解するとよい

イメージ図


Sample6 (1)

Observable の作成

Observable<Integer> observable =
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
            System.out.println("subscribe() end");
        }
    });

Sample6 (2)

Observer の作成

Observer<Integer> observer = new Observer<Integer>() {
    @Override public void onSubscribe(Disposable d) {
        System.out.println("onSubscribe(): " + d);
    }
    @Override public void onNext(Integer n) {
        System.out.println("onNext(): " + n);
    }
    @Override public void onError(Throwable e) {
        System.out.println("onError(): " + e);
    }
    @Override public void onComplete() {
        System.out.println("onComplete()");
    }
};

Sample6 (3)

subscribeOn() と observeOn() を指定し subscribe()

observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.computation())
    .subscribe(observer);

Tumpaca x RxJava


Tumpaca x RxJava

  • reblog を RxJava 化

作業履歴

https://github.com/benkyokai/tumpaca/tree/feature/shiozawa/introducting-rxjava


次回以降


学ぶべきこと

  • Hot and Cold
  • Backpressure
  • Implements Operators