目次
- ReactiveX
- Sync vs Async
- RxJava
- Operators
ReactiveX
ReactiveX とは
- 非同期プログラムのためのライブラリ
- 複数のデータ(イベント)を中心に据える
- Observable Sequence を使う
- 低レイヤ(threading, synchronization, thread-safety, concurrency...)を隠蔽
- ReactiveX の Java 実装: RxJava
ref: http://reactivex.io/intro.html
Sync vs Async
同期的なプログラミング
単一データを取得:T getData()
// 単一データを同期的に取得して出力
Integer i = getData();
System.out.println(i);
同期的なプログラミング
複数データを取得:Iterable<T> getData
// 複数データを同期的に取得して出力
Iterable<Integer> iter = getData();
for (Integer i : iter) {
System.out.println(i);
}
問題
- getData() で処理がブロックする
- 複数データ:すべてのデータが揃わないと出力できない
非同期的なプログラミング
単一データ:Future<T> getData()
Future<Integer> f = getData();
...(その他の処理)...
Integer i = f.get();
System.out.println(i);
非同期的なプログラミング
複数データの取得:Observable<T> getData();
Observable<Integer> observable = getData();
observable.subscribe(i -> System.out.println(i));
...(その他の処理)...
利点
- getData() で処理がブロックしない
- 複数データ:すべてのデータが揃わなくても処理が進む
ReactiveX とは
- 非同期プログラム × 複数のデータ
- デザインパターンの Observer Pattern を複数化したもの
- Observables が中心
- 実装は何でも良い
- シングルスレッド、マルチスレッド、Actor モデル、、、
RxJava
RxJava
- Reactive X の JVM 向けライブラリ
- Java で実装されている
- 1.x と 2.x で全然違う
- 2.x を中心に解説
準備
- gradle に次を設定
compile "io.reactivex.rxjava2:rxjava:2.0.4"
サンプルコード
- Pure Java
- github: TODO
Observable の生成
// 配列を元にして Observable を生成
Integer[] numbers = {1, 2, 3, 4, 5};
Observable<Integer> observable
= Observable.fromArray(numbers);
⇒ Observable に factory メソッドがたくさんある
subscribe
// Observables を subscribe する
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
実行すると
1
2
3
4
5
単一 Item の Observable
// 単一アイテムの Observable
Observable<Integer> observable = Observable.just(1);
⇒ 2.x では、Single を使った方がよい?(要確認)
Java 8 のラムダを利用
// Java 8 のラムダ構文
observable.subscribe(integer -> {
System.out.println(integer);
});
メソッド参照をつかうとさらに簡単に
observable.subscribe(System.out::println);
非同期処理
// 5秒後に 1 を生成する Observable
Future<Integer> future = Executors.newSingleThreadExecutor().submit(() -> {
// バックグラウンドスレッドで実行
Thread.sleep(5000);
return 1;
});
Observable<Integer> observable = Observable.fromFuture(future);
observable.subscribe(System.out::println); // 終了しないので注意
Operators
コレクション操作
- ReactiveX はコレクション操作が重要
- ReactiveX ⇒ Observables が中心
- Observables ⇒ 非同期 × 複数データ
- 複数データ ⇒ コレクション
- 便利なコレクションメソッドを用意している
例:
データ列の11~15個目の要素を "N transformed" という形式で順に出力
同期的な手法
Iterable<T>
の場合:
Iterable<Integer> iter = getDataFromLocal();
iter.skip(10)
.take(5)
.map(s -> s + "transformed")
.forEach(s -> System.out.println(s));
非同期的な手法
Observable<T>
の場合:
Observable<Integer> observable = getDataFromNetwork();
observable.skip(10)
.take(5)
.map(i -> i + "transformed")
.subscribe(s -> System.out.println(s));
Marble Diagrams
- メソッドの効果説明に使われる図

ref: http://reactivex.io/documentation/observable.html
Pull vs Push
- Pull
- 同期的手法は Pull
- データを消費する側が、データを Pull する
- Push
- 非同期的手法は Push
- データを生成する側が、データを Push する
すべてを Push に
ReactiveX 創始者 Erik Meijer:
世界の最先端にい続けたいのなら,“push”すること,つまりリアクティブであることが必要だ,と氏は言う。“pull”を使うというのは,基本的にはプログラムに,あらゆる種類のブロックポイントを加えるということであるから非効率的だ,と氏は言う。
ref: https://www.infoq.com/jp/news/2015/12/erik-meijer-hacker-way
次回
- Android で ReactiveX を使う?