目次
- 前回の復習
- Observable
- Operators
- Scheduler
- Tumpaca x RxJava
前回の復習
ReactiveX
- Observable = 非同期プログラム × 複数のデータ
Integer[] numbers = {1, 2, 3, 4, 5};
Observable<Integer> observable
= Observable.fromArray(numbers);
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
Observable
Observable, Observe, Subscribe
- Observable: Item を発行(emit)する実体
- Observer: Observable が発行した Item を受け取る実体
-
Subscribe: Observer が Observable を監視すること
- 『Observe は Observable を Subscribe する』
Observable Contract
- Observable と Observer の契約
- ReactiveX ではこれらを Notification と呼ぶ
ref: http://reactivex.io/documentation/contract.html
Observable から Observer への Notification
- OnNext(Item): Observable が Item を emit したときの通知
- OnCompleted(): Observable が成功して、これ以上 Item を emit しない時の通知
- OnError(Error): Observable がエラーで中断し、これ以上 Item を emit しない時の通知
-
OnSubscribe()(任意): Observable が Observer からの Request 通知(後述)を受け取る準備ができたときの通知
- Backpressure と呼ばれる機能と関係する
RxJava: Observer インタフェース
-
interface Observer<T>
void onNext(T t);
void onError(Throwable e);
void onComplete();
void onSubscribe(Disposable d);
Observer から Observable への Notification
- Subscribe: Observer 側が通知を受け取る準備ができたことを Observable に指示する
- Unsubscribe: Observer 側が通知を受け取る必要がなくなったことを Observable に指示する
- Request(任意): 一定数の OnNext 通知以上は必要ないことを Observable に通知する
- Backpressure と呼ばれる機能と関係がある
RxJava
- Subscribe
Observable.subscribe(Observer observer);
- Unsubscribe
- RxJava 1.x:
Subscription.unsubscribe();
- RxJava 2.x:
Disposable.dispose();
- RxJava 1.x:
- Request
- RxJava 1.x:
Subscriber.request(int n);
- RxJava 2.x:
Subscription.request(int n);
- RxJava 1.x:
イメージ図
Sample Codes
Sample1 (1)
Observable の作成
Integer[] numbers = {1, 2, 3, 4, 5};
Observable<Integer> observable =
Observable.fromArray(numbers);
Sample1 (2)
Observer の作成
Observer<Integer> observer = new Observer<Integer>() {
@Override public void onSubscribe(Disposable d) {
System.out.println("onSubscribe(): " + d);
}
@Override public void onNext(Integer integer) {
System.out.println("onNext(): " + integer);
}
@Override public void onError(Throwable e) {
System.out.println("onError(): " + e);
}
@Override public void onComplete() {
System.out.println("onComplete()");
}
};
Sample1 (3)
Observable を subscribe
observable.subscribe(observer);
Sample1 (4)
実行結果
onSubscribe(): io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable@619a5dff
onNext(): 1
onNext(): 2
onNext(): 3
onNext(): 4
onNext(): 5
onComplete()
RxJava: Consumer と Action
-
interface Consumer<T>
void accept(T t) throws Exception;
-
interface Action
void run() throws Exception;
- メモ
- Java8 の Lambda を使うのに便利
- Action と java.lang.Runnable との違い:Exception を throws 可
Sample2
onNext, onError, onComplete, onSubscribe を個別に作って subscribe
Consumer<Integer> onNext = new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
System.out.println("onNext(): " + integer);
}
};
Consumer<Throwable> onError = new Consumer<Throwable>() {
@Override public void accept(Throwable e) throws Exception {
System.out.println("onError(): " + e);
}
};
Action onComplete = new Action() {
@Override public void run() throws Exception {
System.out.println("onComplete()");
}
};
Consumer<Disposable> onSubscribe = new Consumer<Disposable>() {
@Override public void accept(Disposable disposable) throws Exception {
System.out.println("onSubscribe(): " + disposable);
}
};
observable.subscribe(onNext, onError, onComplete, onSubscribe);
Sample3
ラムダで一気に subscribe
(メソッド参照で更に省略可)
observable.subscribe(
integer -> {
System.out.println("onNext(): " + integer);
},
e -> {
System.out.println("onError(): " + e);
},
() -> {
System.out.println("onComplete()");
},
disposable -> {
System.out.println("onSubscribe(): " + disposable);
});
Sample4
3回目の OnNext で throw new RuntimeException()
Consumer<Integer> onNext = new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
if (integer != null && integer == 3) {
throw new RuntimeException("A number is three.");
}
System.out.println("onNext(): " + integer);
}
};
Consumer<Throwable> onError = new Consumer<Throwable>() {
@Override public void accept(Throwable e) throws Exception {
System.out.println("onError(): " + e);
}
};
observable.subscribe(onNext, onError, onComplete);
Sample4
実行結果
onNext(): 1
onNext(): 2
onError(): java.lang.RuntimeException: A number is three.
Notification Contracts
- Observable は OnNext() 通知を 0 以上 Observer に送る
- OnError() か OnCompleted() が送られたら、以降、通知は送られない
- 通知がまったく送られない場合がある
- この場合は Observable はまだ Active
Observable を作成
- 1, 2, 3 を通知する Observable を作成
Observable とは
- Observable = Observer に通知を送る実体
- どのタイミングで通知を開始?
- Observer が subscribe した時(Cold Observable)
⇒ Observer が Observable を Subscribe したタイミングで、
OnNext(1), OnNext(2), OnNext(3), OnCompleted() を通知するクラスを作る
Sample5
1, 2, 3 を返す Observable の作成
Observable<Integer> observable =
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
Sample5
実行結果
example5
onSubscribe(): null // なぜ?
onNext(): 1
onNext(): 2
onNext(): 3
onComplete()
Operators
Operator とは
- Operator = 下図の四角い箱
注意事項
実装によって、
- 名前が違う
- 特別な Operator がある
- 一定の Operator が存在しない
ということがあります。
Operator の種類
- Observable を作成(Create, From)
- Observable を変換(Map)
- Observable をフィルタ(Filter)
- などなど、、、
必要な時はここから探す
http://reactivex.io/documentation/operators.html
Operator Chain
- Operator は Chain できる
- Builder Pattern に似ている
- しかし、順番が重要という点で Builder Patter と異なる
observable
.skip(10)
.take(5)
.map(i -> i + "transformed")
Scheduler
Scheduler
- 特定の Operator を違うスレッドで実行したい
- ⇒ 実行したい Scheduler を Observable に指定する
- 指定方法
- SubscribeOn()
- ObserveOn()
SubscribeOn()
- Observable が動作する Scheduler を指定する
- Operator Chain のどこで指定してもよい
- 「ObservableOnSubscribe.subscribe() が実行されるスレッドの指定」と理解するとよい
ObserveOn()
- Observable から Observer に通知する際の Scheduler を指定する
- Operator Chain で指定した後ろの Operator に適用される
- 「Observer のコールバックが実行されるスレッド」と理解するとよい
イメージ図
Sample6 (1)
Observable の作成
Observable<Integer> observable =
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
System.out.println("subscribe() end");
}
});
Sample6 (2)
Observer の作成
Observer<Integer> observer = new Observer<Integer>() {
@Override public void onSubscribe(Disposable d) {
System.out.println("onSubscribe(): " + d);
}
@Override public void onNext(Integer n) {
System.out.println("onNext(): " + n);
}
@Override public void onError(Throwable e) {
System.out.println("onError(): " + e);
}
@Override public void onComplete() {
System.out.println("onComplete()");
}
};
Sample6 (3)
subscribeOn() と observeOn() を指定し subscribe()
observable
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe(observer);
Tumpaca x RxJava
Tumpaca x RxJava
- reblog を RxJava 化
作業履歴
次回以降
学ぶべきこと
- Hot and Cold
- Backpressure
- Implements Operators