なぜRxJava?
RxJavaは様々な特性を併せ持ったライブラリですが、簡単にまとめると以下のような機能に分類されます。
- List処理の抽象化・ストリーム化
- Optional
- Future/Promise
- Data Binding
- Event Bus
Java 8ではStream APIやOptionalが導入されていますが、Androidや業務要件などそのAPIを使えない環境も存在します。
また、非同期や並列などそもそもJavaで扱いにくい処理を、統一されたインターフェイスで簡潔に記述できるなどのメリットも多く、その基本的な機能を試してみたので紹介します。
なお、ここに記載する内容はRxJavaの使い方が中心で、RxJava自体の概念やFRPなどについては、他にも多くの方々紹介してくださっていますので、そちらをご覧いただければと思います。
また、今回のサンプルコードは単純化のためにfromやjust,rangeなど生成済みのストリームを使用していますが、本来Rxは時間的に変化するストリームも扱うことができるので、実運用で使用するときにはその辺りも考慮して使用する必要があります。
Use Case
細かな機能からだと、メリットがわかりにくくなるので、まずはいくつかのユースケースを紹介したいと思います。
Stream APIのようにCollectionを操作する
感覚的に理解しやすい同期的にCollectionの操作を行う例。
List<Integer> res = Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6)) // IterableなオブジェクトからObservableを生成
.flatMap(Observable::just) // Stream APIのflatMap相当
.filter(i -> i % 2 == 0) // Stream APIのfilter相当
.map(i -> i * 2) // Stream APIのmap相当
.skip(1) // Stream APIのskip相当
.take(3) // Stream APIのlimit相当 ※同様の処理のlimit()も存在する
.toList() // ストリームをListに変換
.toBlocking() // 同期処理で行う
.single(); // 要素を取得
System.out.println(res); // [8, 12]
ファクトリとなるfrom()と同期的にListの変換を行うtoList().toBlocking().single()部分の記述に違いはありますが、ほとんどJava 8のStream APIと同じような感覚で操作が出来ます。
Optionalのように扱う
RxJavaの中心となっているObservableはOptionalに似た性質も持っているので、それを利用してOptionalのように扱うことも出来ます。
そのまま記述すると冗長になるので、今回はObservablesというユーティリティを作成しています。
@Test
public void Optionalのように扱う() throws Exception {
Observable<String> exist = Observables.ofNullable("val");
Observable<Object> empty = Observables.ofNullable(null);
// Optional#ifPresent
exist.subscribe(System.out::println); // val
empty.subscribe(System.out::println); // 表示処理自体も呼ばれない
// Optional#flatMap, map, filter
Observables.ofNullable("1,2,3,4")
.flatMap(str-> Observable.from(asList(str.split(","))))
.map(Integer::parseInt)
.filter(i -> i % 2 == 0)
.subscribe(System.out::println);
// Optional#get
Observables.get(exist); // val
Observables.get(empty); // throw NoSuchElementException
// Optional#orElse
Observables.orElse(exist, "default"); // val
Observables.orElse(empty, "default"); // default
// Optional#isPresent
Observables.isPresent(exist); // true
Observables.isPresent(empty); // false
}
class Observables {
static <T> Observable<T> of(T value){
if(value == null){
throw new NullPointerException();
}
else{
return Observable.just(value);
}
}
static <T> Observable<T> ofNullable(T value){
return value == null ? Observable.empty() : Observable.just(value);
}
static <T> T get(Observable<T> observable){
return observable.toBlocking().single();
}
static <T> T orElse(Observable<T> observable, T other){
return observable.defaultIfEmpty(other).toBlocking().single();
}
static <T> boolean isPresent(Observable<T> observable){
return !observable.isEmpty().toBlocking().single();
}
static <T> T orElseGet(Observable<T> observable, Func0<T> other) {
return isPresent(observable) ? get(observable) : other.call();
}
static <T, X extends Throwable> T orElseThrow(Observable<T> observable, Func0<? extends X> other) throws X {
if (isPresent(observable)) {
return get(observable);
} else {
throw other.call();
}
}
}
Future/Promiseのように扱う
Observableは実行するスレッドを切り替えるためのメソッドを用意しているので、Web APIやDBアクセスなど重い処理を非同期化し合成することができます。
ObservableとFuture/Promiseとの違いは、Request/Responseのような一つの要素だけではなくイベントのような複数発生する要素をを扱えることです。
今回はFuture/Promiseと同様にObservableに単一の要素を追加して扱います。
@Test
public void FuturePromiseのように扱う() throws Exception {
// 直列+並列に処理する例
// 最初にユーザの認証情報を取得し、その認証情報を元に並列にユーザ情報を取得する例
Stopwatch sw = Stopwatch.createStarted();
getAuthId("id", "pass")
.flatMap(id -> Observable.zip(getUserInfo(id), getUserItems(id),
(info, items) -> info + " has items = " + items))
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("All time: " + sw.stop());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
// Outpuot:
//
// getAuthId, ThreadName: RxCachedThreadScheduler-1, Params:id,pass
// getUserInfo, ThreadName: RxCachedThreadScheduler-2, Params: 123
// getUserItems, ThreadName: RxCachedThreadScheduler-3, Params: 123
// Tom has items = [Apple, Banana]
// All time: 4.191 s
}
Observable<Integer> getAuthId(String id, String pass) {
return Observable.create((Subscriber<? super Integer> subscriber) -> {
// 本来以下はWebやDBアクセスなど
try {
System.out.println(String.format("getAuthId, ThreadName: %s, Params:%s,%s", Thread.currentThread().getName(), id, pass));
Threads.sleep(1000);
subscriber.onNext(123);
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}).subscribeOn(Schedulers.io());
}
Observable<String> getUserInfo(Integer authId) {
return Observable.create((Subscriber<? super String> subscriber) -> {
// 本来以下はWebやDBアクセスなど
try {
System.out.println(String.format("getUserInfo, ThreadName: %s, Params: %d", Thread.currentThread().getName(), authId));
Threads.sleep(2000);
subscriber.onNext("Tom");
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}).subscribeOn(Schedulers.io());
}
Observable<List<String>> getUserItems(Integer authId) {
return Observable.create((Subscriber<? super List<String>> subscriber) -> {
// 本来以下はWebやDBアクセスなど
try {
System.out.println(String.format("getUserItems, ThreadName: %s, Params: %d", Thread.currentThread().getName(), authId));
Threads.sleep(3000);
subscriber.onNext(asList("Apple", "Banana"));
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}).subscribeOn(Schedulers.io());
}
各メソッドの処理時間が1+2+3なので6秒かかるはずの処理が並列に呼び出しを行っているので4秒強で完了します。
またこういった非同期、並列処理をHttp経由で行う際には、Retrofitと組み合わせることでより簡潔に記述できます。
クライアントサイドだと更に、Data BindingやEvent Busなどの機能も有効に活用できると思います。
RxJavaのAPI
RxJavaには非常に豊富なAPIが用意されているので、ここからはそのAPIをいくつか紹介します。
Observableの生成
RxJavaには多くのファクトリが用意されており、これらを使用することで様々なデータからObservableを生成することができます。
Creating Observables
from
IterableなオブジェクトからObservableを生成。
その他にも配列やFutureなど多くの引数からObservableを生成するオーバーロードが存在する。
Observable<List<Integer>> o = Observable.from(list);
create
OnSubscribeを指定してObservableを生成。
Observable<String> o = Observable.create(subscriber -> {
subscriber.onNext("Hello");
subscriber.onNext("world!");
subscriber.onCompleted();
});
range
要素の範囲を指定してObservableを生成。
Observable<Integer> o = Observable.range(1, 5);
just
引数で直接Observableを生成
Observable<Integer> o = Observable.just(1, 2, 3);
error
例外を持ったObservableを生成
Observable<Object> o = Observable.error(new RuntimeException("Error!"));
empty
空のObservableを生成
Observable<Object> o = Observable.empty();
interval
非同期で指定間隔ごとにsubscribeする。
toBlockingなどしてもブロックされない。
takeなどで回数制限を行うことができる。
Observable
.interval(1, TimeUnit.SECONDS)
.subscribe(System.out::print);
// 12345...
Observableの操作
生成したObservableには様々な操作を行うことができます。
StreamAPIのようにCollectionを操作するで記述した、map、flatMap、fileter、skip、takeなどもその一部です。
ここではその中でよく使いそうなものを紹介します。
map
Observable.range(1, 5)
.map(i -> i * 2)
.subscribe(System.out::print);
// 246810
flatMap
Observable.just(1, 2)
.flatMap(i -> Observable.range(i, 3))
.subscribe(System.out::print);
// 123234345
filter
Observable.range(1, 5)
.filter(i -> i % 2 == 0)
.subscribe(System.out::print);
// 24
skip
Observable.range(1, 5)
.skip(3)
.subscribe(System.out::print);
// 345
take
先頭から指定された数のアイテムを抽出する。
同様の処理のlimitも存在する。
Observable.range(1, 5)
.take(3)
.subscribe(System.out::print);
// 123
reduce
Observable.range(1, 3)
.reduce((sum, value) -> sum + value)
.subscribe(System.out::print);
// 6
scan
前回のアイテムも引数に取るmap。畳み込まないreduce。
Observable.range(1, 3)
.scan((sum, value) -> sum + value)
.subscribe(System.out::print);
// 136
collect
Observable.range(1, 5).collect(new ArrayList<>(), (li, i) -> li.add(i));
.subscribe(System.out::println);
// [1, 2, 3, 4, 5]
merge
複数のObservableを合成。flatMapを使う必要がない。
Observable.merge(Observable.from(list1), Observable.from(list2))
concat
複数のObservableを合成。
mergeとconcatの違いはmergeはObservableを合成する際に全てのストリームの順を考慮して合成するのに対し、concatは引数で渡されたObservableの順で合成すること。
Observable.concat(Observable.from(list1), Observable.from(list2));
zip
渡されたストリームのアイテムが全て揃った時点で呼び出される。
mergeやconcatとの違いは、異なる型のObservableを扱えること。
Observable<Integer> o1 = Observable.just(1, 3, 5);
Observable<Integer> o2 = Observable.just(2, 4, 6);
Observable
.zip(o1, o2, (d1, d2) -> d1 + " + " + d2 + " = " + (d1 + d2))
.subscribe(System.out::println);
// 2 + 1 = 3
// 4 + 3 = 7
// 6 + 5 = 11
combineLatest
zip同様に異なる型のObservableを扱うが、各アイテムのonNextが呼ばれた時点でもっとも近いストリームのアイテムを扱う。
groupBy
Observable.range(1, 5)
.groupBy(i -> i % 2)
.subscribe(grouped -> {
grouped.toList().subscribe(list -> System.out.println(format("key=%d,value=%s", grouped.getKey(), list)));
});
// key=0,value=[2, 4]
// key=1,value=[1, 3, 5]
distinct
重複を排除する。
また重複方法を選択する distinct(Func1 keySelector) も存在する。
Observable.from(asList(1, 2, 3, 3, 4, 5, 5))
.distinct()
.subscribe(System.out::print);
// 12345
window
ストリームを束ねて、新たなストリームを生成する。
束ね方により複数のオーバーロードが存在する。
Observable.range(1, 5)
.window(2)
.subscribe(i -> i.toList().subscribe(System.out::print));
// [1, 2][3, 4][5]
buffer
指定した間隔でストリームを分割したObservableのListを生成する。
数値の他に時間や関数など複数の引数をとるオーバーロードが存在する。
Observable.range(1, 5)
.buffer(2)
.subscribe(System.out::print);
// [1, 2][3, 4][5]
repeat
onCompletedされたときに再度subscribeするオペレータ。
repeat処理を指定する、repeatWhenも存在する。
Random r = new Random();
Observable.create(s -> {
s.onNext(r.nextInt());
s.onCompleted();
})
.repeat(3)
.subscribe(System.out::println);
// 1355134043
// 1691330796
// 1863470304
timeout
タイムアウトを設定し、タイムアウトした場合にはTimeoutExceptionが発生する
Observable.interval(3, TimeUnit.SECONDS)
.timeout(1, TimeUnit.SECONDS)
.onErrorReturn(e -> {
System.out.println(e);
return -1L;
})
.subscribe(System.out::println);
// java.util.concurrent.TimeoutException
// -1
非同期・マルチスレッド・Scheduler
非同期、並列処理などでスレッドの切り替えを行う際には、Schedulerを利用します。
ここでは標準で用意されているSchedulerとその使い方について記載します。
Scheduler
標準で用意されているSchedulers
Scheduler | 動作 |
---|---|
computation | CPUバウンド用スレッドを生成する。データの加工など向け。コア数と同じ数のキャッシュされたスレッドを使ってイベント処理をする。I/O処理ってはいけない。 |
io | I/Oバウンド用のスレッドを生成する。DBアクセスなど向け。1つだけキャッシュされているスレッドを使ってイベント処理を行う。 |
trampoline | ThredLocalに処理をキューイングし現在実行中の処理が完了したら逐次実行する。 |
immediate | 現在のスレッドを使ってイベントを処理する。処理を遅延することはできるがキューイングすることはできない。 |
newThread | 作業単位ごとに新しいスレッドを作ってイベントを処理する。 |
subscribeOn
RxJavaの一連の処理を実行するスレッドを指定する。
Observable.range(1, 5)
.subscribeOn(Schedulers.newThread())
.subscribe(i -> System.out.print(Thread.currentThread().getName()));
// RxNewThreadScheduler-1
// RxNewThreadScheduler-1
// RxNewThreadScheduler-1
// RxNewThreadScheduler-1
// RxNewThreadScheduler-1
observeOn
Observerの実行するスレッドを指定する。
Observable.range(1, 3)
.subscribeOn(Schedulers.newThread())
.map(i -> {
System.out.println("map: (" + Thread.currentThread().getName() + ")");
return i * 2;
})
.observeOn(Schedulers.computation())
.subscribe(i -> System.out.println("subscribe: " + i + " (" + Thread.currentThread().getName() + ")"));
// map: (RxNewThreadScheduler-1)
// map: (RxNewThreadScheduler-1)
// map: (RxNewThreadScheduler-1)
// subscribe: 2 (RxComputationThreadPool-3)
// subscribe: 4 (RxComputationThreadPool-3)
// subscribe: 6 (RxComputationThreadPool-3)
エラーハンドリング
RxJavaのメリットとして、通常処理単位で行わなければならないようなストリームの中の処理に対して、統一した記法でエラーハンドリングを行うことが出来ます。
onErrorReturn
例外が発生時の処理を行う。
同じ型のObservableを返り値とするonErrorResumeNextも存在する。
Observable.range(1, 5)
.doOnNext(i -> {
if (i == 2) {
throw new RuntimeException("Error!!");
}
})
.onErrorReturn(e -> {
System.out.println(e.getMessage());
return -1;
})
.subscribe(System.out::println);
// 1
// Error!!
// -1
retry
例外が発生しても指定の回数リトライを行う。
リトライ処理を細かく指定できるObservable#retryWhenも存在する。
Observable.range(1, 5)
.doOnNext(i -> {
if (new Random().nextInt(3) == 1) {
throw new RuntimeException("Error!");
}
})
.retry(1)
.onErrorReturn(e -> {
System.out.println(e);
return -1;
})
.subscribe(System.out::println);
// 1
// 2
// 1
// 2
// 3
// java.lang.RuntimeException: Error!
// -1
まとめ
ここで紹介したAPIはほんの一部で、それ以外にもHot/Coldの性質やData Binding、Event Bus、Reactive Streams対応など多くの機能が存在するので、その辺りはまた時間が取れた時に追加していこうと思います。
参考
RxJava GitHub
RxJava Javadoc
Web+DB Press val.81 Javaの鉱脈
RxのHotとColdについて
Mastering observables
【翻訳】あなたが求めていたリアクティブプログラミング入門
RxJava Schedulers Use Cases
Rx逆引き