Edited at

RxJava 勉強会 Week2

More than 1 year has passed since last update.


目次


  • 前回の復習

  • Observable

  • Operators

  • Scheduler

  • Tumpaca x RxJava



前回の復習



ReactiveX


  • Observable = 非同期プログラム × 複数のデータ

Integer[] numbers = {1, 2, 3, 4, 5};

Observable<Integer> observable
= Observable.fromArray(numbers);
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});



Observable



Observable, Observe, Subscribe



  • Observable: Item を発行(emit)する実体


  • Observer: Observable が発行した Item を受け取る実体


  • Subscribe: Observer が Observable を監視すること


    • 『Observe は Observable を Subscribe する』





Observable Contract


  • Observable と Observer の契約

  • ReactiveX ではこれらを Notification と呼ぶ

ref: http://reactivex.io/documentation/contract.html



Observable から Observer への Notification



  • OnNext(Item): Observable が Item を emit したときの通知


  • OnCompleted(): Observable が成功して、これ以上 Item を emit しない時の通知


  • OnError(Error): Observable がエラーで中断し、これ以上 Item を emit しない時の通知


  • OnSubscribe()(任意): Observable が Observer からの Request 通知(後述)を受け取る準備ができたときの通知



    • Backpressure と呼ばれる機能と関係する





RxJava: Observer インタフェース



  • interface Observer<T>


    • void onNext(T t);

    • void onError(Throwable e);

    • void onComplete();

    • void onSubscribe(Disposable d);





Observer から Observable への Notification


  • Subscribe: Observer 側が通知を受け取る準備ができたことを Observable に指示する

  • Unsubscribe: Observer 側が通知を受け取る必要がなくなったことを Observable に指示する

  • Request(任意): 一定数の OnNext 通知以上は必要ないことを Observable に通知する



    • Backpressure と呼ばれる機能と関係がある





RxJava


  • Subscribe


    • Observable.subscribe(Observer observer);



  • Unsubscribe


    • RxJava 1.x: Subscription.unsubscribe();

    • RxJava 2.x: Disposable.dispose();



  • Request


    • RxJava 1.x: Subscriber.request(int n);

    • RxJava 2.x: Subscription.request(int n);





イメージ図

スクリーンショット 2017-02-03 9.46.15.png



Sample Codes

https://github.com/benkyokai/RxJavaStudy/tree/master/Week1/src/main/java/com/hjm/week2/Week2.java



Sample1 (1)

Observable の作成

Integer[] numbers = {1, 2, 3, 4, 5};

Observable<Integer> observable =
Observable.fromArray(numbers);



Sample1 (2)

Observer の作成

Observer<Integer> observer = new Observer<Integer>() {

@Override public void onSubscribe(Disposable d) {
System.out.println("onSubscribe(): " + d);
}
@Override public void onNext(Integer integer) {
System.out.println("onNext(): " + integer);
}
@Override public void onError(Throwable e) {
System.out.println("onError(): " + e);
}
@Override public void onComplete() {
System.out.println("onComplete()");
}
};



Sample1 (3)

Observable を subscribe

observable.subscribe(observer);



Sample1 (4)

実行結果

onSubscribe(): io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable@619a5dff

onNext(): 1
onNext(): 2
onNext(): 3
onNext(): 4
onNext(): 5
onComplete()



RxJava: Consumer と Action



  • interface Consumer<T>


    • void accept(T t) throws Exception;




  • interface Action


    • void run() throws Exception;



  • メモ


    • Java8 の Lambda を使うのに便利

    • Action と java.lang.Runnable との違い:Exception を throws 可





Sample2

onNext, onError, onComplete, onSubscribe を個別に作って subscribe

Consumer<Integer> onNext = new Consumer<Integer>() {

@Override public void accept(Integer integer) throws Exception {
System.out.println("onNext(): " + integer);
}
};
Consumer<Throwable> onError = new Consumer<Throwable>() {
@Override public void accept(Throwable e) throws Exception {
System.out.println("onError(): " + e);
}
};
Action onComplete = new Action() {
@Override public void run() throws Exception {
System.out.println("onComplete()");
}
};
Consumer<Disposable> onSubscribe = new Consumer<Disposable>() {
@Override public void accept(Disposable disposable) throws Exception {
System.out.println("onSubscribe(): " + disposable);
}
};
observable.subscribe(onNext, onError, onComplete, onSubscribe);



Sample3

ラムダで一気に subscribe

(メソッド参照で更に省略可)

observable.subscribe(

integer -> {
System.out.println("onNext(): " + integer);
},
e -> {
System.out.println("onError(): " + e);
},
() -> {
System.out.println("onComplete()");
},
disposable -> {
System.out.println("onSubscribe(): " + disposable);
});



Sample4

3回目の OnNext で throw new RuntimeException()

Consumer<Integer> onNext = new Consumer<Integer>() {

@Override public void accept(Integer integer) throws Exception {
if (integer != null && integer == 3) {
throw new RuntimeException("A number is three.");
}
System.out.println("onNext(): " + integer);
}
};
Consumer<Throwable> onError = new Consumer<Throwable>() {
@Override public void accept(Throwable e) throws Exception {
System.out.println("onError(): " + e);
}
};
observable.subscribe(onNext, onError, onComplete);



Sample4

実行結果

onNext(): 1

onNext(): 2
onError(): java.lang.RuntimeException: A number is three.



Notification Contracts


  • Observable は OnNext() 通知を 0 以上 Observer に送る

  • OnError() か OnCompleted() が送られたら、以降、通知は送られない

  • 通知がまったく送られない場合がある


    • この場合は Observable はまだ Active





Observable を作成


  • 1, 2, 3 を通知する Observable を作成



Observable とは


  • Observable = Observer に通知を送る実体

  • どのタイミングで通知を開始?


    • Observer が subscribe した時(Cold Observable)



⇒ Observer が Observable を Subscribe したタイミングで、

OnNext(1), OnNext(2), OnNext(3), OnCompleted() を通知するクラスを作る



Sample5

1, 2, 3 を返す Observable の作成

Observable<Integer> observable =

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});



Sample5

実行結果

example5

onSubscribe(): null // なぜ?
onNext(): 1
onNext(): 2
onNext(): 3
onComplete()



Operators



Operator とは


  • Operator = 下図の四角い箱



注意事項

実装によって、


  • 名前が違う

  • 特別な Operator がある

  • 一定の Operator が存在しない

ということがあります。



Operator の種類


  • Observable を作成(Create, From)

  • Observable を変換(Map)

  • Observable をフィルタ(Filter)

  • などなど、、、

必要な時はここから探す

http://reactivex.io/documentation/operators.html



Operator Chain


  • Operator は Chain できる

  • Builder Pattern に似ている


    • しかし、順番が重要という点で Builder Patter と異なる



observable

.skip(10)
.take(5)
.map(i -> i + "transformed")



Scheduler



Scheduler


  • 特定の Operator を違うスレッドで実行したい


    • ⇒ 実行したい Scheduler を Observable に指定する



  • 指定方法


    • SubscribeOn()

    • ObserveOn()





SubscribeOn()


  • Observable が動作する Scheduler を指定する

  • Operator Chain のどこで指定してもよい

  • 「ObservableOnSubscribe.subscribe() が実行されるスレッドの指定」と理解するとよい



ObserveOn()


  • Observable から Observer に通知する際の Scheduler を指定する

  • Operator Chain で指定した後ろの Operator に適用される

  • 「Observer のコールバックが実行されるスレッド」と理解するとよい



イメージ図



Sample6 (1)

Observable の作成

Observable<Integer> observable =

Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
System.out.println("subscribe() end");
}
});



Sample6 (2)

Observer の作成

Observer<Integer> observer = new Observer<Integer>() {

@Override public void onSubscribe(Disposable d) {
System.out.println("onSubscribe(): " + d);
}
@Override public void onNext(Integer n) {
System.out.println("onNext(): " + n);
}
@Override public void onError(Throwable e) {
System.out.println("onError(): " + e);
}
@Override public void onComplete() {
System.out.println("onComplete()");
}
};



Sample6 (3)

subscribeOn() と observeOn() を指定し subscribe()

observable

.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe(observer);



Tumpaca x RxJava



Tumpaca x RxJava


  • reblog を RxJava 化



作業履歴

https://github.com/benkyokai/tumpaca/tree/feature/shiozawa/introducting-rxjava



次回以降



学ぶべきこと


  • Hot and Cold

  • Backpressure

  • Implements Operators