概要
「何か流行っているけどよくわからない」程度の認識だった RxJava について、先日『WEB+DB PRESS』 vol.81 の「Javaの鉱脈」を読んで、**「実装したい処理に合わせて Observable を適切に独自定義する」**ことが RxJava を考える上で重要だと気付きました。RxJava をどういう風に使うものなのか、自分なりの理解ができたので書いてみます。
実行環境
Java SE 8で動かします。もちろん Lambda 式を使います。ご了承ください。
Java | SE 1.8.0_91 |
---|---|
Eclipse | Mars 4.5.2 |
RxJava | 1.1.0 |
ソースコード
今回のソースコードは GitHub に置きました。
ソースコード(GitHub)
RxJava とは?
リアクティヴプログラミング(後述)を Java でやるためのライブラリです。Rx とは Reactive Extensions の略だそうです。なお、Java での Reactive Extensions 実装はほかに Reactor Core というものがあるそうです。詳しくは「Reactor Core 2.5: もう一つのJava向けReactive Extensions実装」をご覧ください。
そもそもリアクティヴプログラミングとは?
連続的なデータをイベントハンドラで処理していくプログラミングスタイル
『WEB+DB PRESS』 vol.81 「Javaの鉱脈」 より
???
で、Rx すると何がよいのか?
下記3つを宣言的に記述可能です。
- 逐次発生するデータの観測
- 適切なデータの選択
- 発生したデータに対する処理
自分の実装したい処理に RxJava が適しているのかを考える際は上記3点を用いるとよいでしょう。私が考え付いた例を挙げてみます。
(例) ファイルウォッチャー
Markdown エディタにて、ローカルの Markdown ファイル更新を検知→プレビュー用 WebView をリロード
項目 | 例 |
---|---|
逐次発生するデータ | 更新されたファイルの Path オブジェクト |
適切なデータの選択 | ファイルであること、コピーが可能であること |
発生したデータに対する処理 | ファイルをコピー、プレビューの更新 |
(例) クローラー
項目 | 例 |
---|---|
逐次発生するデータ | リクエストURL |
適切なデータの選択 | 適切なURLであること、robots.txt でクロールが許可されていること |
発生したデータに対する処理 | URL のコンテンツを取得、DBへの格納 |
(例) RxKafka
メッセージキュー Kafka にデータが溜まったら逐次 Consume して何かしらの処理をさせます。
項目 | 例 |
---|---|
逐次発生するデータ | Kafka のメッセージ |
適切なデータの選択 | データ形式が正当 |
発生したデータに対する処理 | データのパース等 |
なお Scala にはそのようなライブラリがすでにあるようです。
RxJava の API
rx.Observable
RxJava の核ともいえる部分です。この1クラスだけで1万行近く(空行・コメント含む)あります。 RxJava を使う上で必要なメソッドが定義されています。
Observable の static factory method
いくつかありますが、実際は create をほぼ使うことになると思います。
Method name | Variable |
---|---|
from | オブジェクトの配列 |
just | 10個までのオブジェクト(可変長引数ではない) |
range | int のみ、range(1,10) で1から10までの要素を持つ Observable を生成 |
create | OnSubscribe から Observable を生成 |
イベント処理用のオペレータメソッド
Java8 の Stream API でも見かける filter や map という名前は、高階関数(Wikipedia)のオペレータとして標準的な名前のようです。主なものを下記に挙げます。Observable を返すのでメソッドチェインでつなげて記述することが可能です。
Operator name | Description |
---|---|
filter(Func1 super T, Boolean> predicate) | 残す値を決定 |
map(Func1 super T, ? extends R> func) | 元の値を変更して次に送る。型も変更可能 |
cache() | Observable をキャッシュして再利用可能にする |
retry(final long count) | 処理を複数回試行させる。long で最大回数を指定可能 |
RxJava が開発された当時は Java の標準に関数型プログラミングをするためのクラスが揃っていなかったので、独自に実装してあります。
実行するスレッドを変更するメソッド
処理を実行する Scheduler(rx.Scheduler) を指定します。
Method name | Description |
---|---|
observeOn(Scheduler scheduler) | observer の実行スレッドを選択、実行後から変更 |
subscribeOn(Scheduler scheduler) | operator の実行スレッドを選択/ 最初に指定したスレッドを使う/ 途中で指定しても最初のOperatorから指定したスレッドを使う |
rx.Scheduler
処理を実行するスレッドの実体を保持するもので、非同期処理を書く際に重要となります。Schedulers クラスの static factory method から選択します。
Scheduler factory method | Description |
---|---|
immediate() | 現在のスレッドですぐに動かす |
trampoline() | 現在のスレッドに処理をプールして徐々に動かす |
newThread() | 都度新しいスレッドで動かす |
computation() | キャッシュされたスレッドで動かす。I/O処理以外で使う |
io() | キャッシュされたスレッドで動かす。I/O処理で使う |
from(Executor executor) | 指定された Executor で動かす |
test() | デバッグ用 |
rx.Observer
イベントの処理をするインタフェイスです。
Method name | Description | Times |
---|---|---|
void onNext(T t) | 要素1つ1つの処理を記述 | 複数回 |
void onCompleted() | 終了する際の処理を記述 | 1回のみ |
void onError(Throwable e) | 例外発生時の処理を記述 | 1回のみ |
実際のプログラミングでは Observable を生成する際に適宜定義するため、これを直接実装することはほぼありません。
クイックスタート
フォルダを作って移動
$ mkdir vRxJava
$ cd vRxJava
プロジェクトをgradleで初期化
vRxJava> $ gradle init --type java-library
:wrapper
:init
生成された build.gradle に RxJava の依存を追加
apply plugin:'application'
mainClassName = 'RxFizzBuzz'
repositories {
mavenCentral()
}
dependencies {
compile 'io.reactivex:rxjava:1.1.0'
}
依存を解決
vRxJava> gradle dependencies
Download https://jcenter.bintray.com/io/reactivex/rxjava/1.1.0/rxjava-1.1.0.pom
Download https://jcenter.bintray.com/io/reactivex/rxjava/1.1.0/rxjava-1.1.0.jar
:dependencies
------------------------------------------------------------
Root project
------------------------------------------------------------
archives - Configuration for archive artifacts.
No dependencies
compile - Compile classpath for source set 'main'.
\--- io.reactivex:rxjava:1.1.0
default - Configuration for default artifacts.
\--- io.reactivex:rxjava:1.1.0
runtime - Runtime classpath for source set 'main'.
\--- io.reactivex:rxjava:1.1.0
試しにFizzBuzz する
本当に RxJava を使っているだけで全然リアクティヴでもなんでもないコードです。
import rx.Observable;
public class RxFizzBuzz {
public static final void main(final String[] args) {
Observable.range(1, 100)
.map(i -> {
if (i % 15 == 0) {
return "FizzBuzz";
}
if (i % 3 == 0) {
return "Fizz";
}
if (i % 5 == 0) {
return "Buzz";
}
return Integer.toString(i);
})
.subscribe(
(i) -> {System.out.print(i + ", ");},
(e) -> e.printStackTrace(),
System.out::println
);
}
}
実行
\>gradle run
:compileJava UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:run
1, 2, Fizz, 4, Buzz, Fizz, 7, 8, Fizz, Buzz, 11, Fizz, 13, 14, FizzBuzz, 16, 17, Fizz, 19, Buzz, Fizz, 22, 23, Fizz, Buzz, 26, Fizz, 28, 29, FizzBuzz, 31, 32, Fizz, 34, Buzz, Fizz, 37, 38, Fizz, Buzz, 41, Fizz, 43, 44, FizzBuzz, 46, 47, Fizz, 49, Buzz, Fizz, 52, 53, Fizz, Buzz, 56, Fizz, 58, 59, FizzBuzz, 61, 62, Fizz, 64, Buzz, Fizz, 67, 68, Fizz, Buzz, 71, Fizz, 73, 74, FizzBuzz, 76, 77, Fizz, 79, Buzz, Fizz, 82, 83, Fizz, Buzz, 86, Fizz, 88, 89, FizzBuzz, 91, 92, Fizz, 94, Buzz, Fizz, 97, 98, Fizz, Buzz,
何の変哲もない FizzBuzz が表示されました。
処理の流れ
Observable でデータソースの指定
Observable.range で 1 から 100 までの int 値を要素に持つ Observable オブジェクトを作ります。
Operator でデータ操作
Java8 の Stream API と同じ名前のメソッドと同じ操作ができるような感じを受けます。もちろん Lambda で記述可能です。
Observable#subscribe で処理を決定&実行
Observable#subscribe で onNext/onError/onComplete を実装します。このメソッドを呼び出すことにより、初めて処理が実行されます。
Observable を自作する
RxJava は登場してからすでに数年経過しているライブラリなので、資料も大量に存在します。ただ、入門向けの記事だと Observable の just や from や range を使った簡単なサンプルが多く(入門記事なので当たり前と言えばそうですが)、それらで作成できる Observable は作成時にすべてのデータがそろっていなければいけません。固定値しか扱えない Observable では、逐次データが発生するケースで力を発揮する RxJava の本領からは程遠いものになってしまいます。
各自のアプリケーション開発に RxJava を取り入れるには、独自の Observable を定義することが必須のようです。というより、独自定義の Observable を使用しないなら、 RxJava はちょっと変な Stream API 程度のものでしかなくて、それをわかっていないとこのような記事を書いてしまいます。
手順
- rx.Subscriber を引数に持つコールバックメソッドを定義
- 定期的に isUnsubscribed を呼び出して終了を確認
- データを1つずつ onNext
- 処理の流れが正常に完了したなら onComplete()
- 処理の流れが途中で異常終了したなら onError()
実装例
FizzBuzz 用の Observable を置き換えるものを書いてみます。
コード全体
import rx.Observable;
public class OnSubscribeImplementation {
public static final void main(final String[] args) {
final Observable<String> observable = Observable.<Integer>create((sub) -> {
for (int i = 1; i <= 100; i++) {
sub.onNext(i);
}
sub.onCompleted();
})
.map(i -> {
if (i % 15 == 0) {
return "FizzBuzz";
}
if (i % 3 == 0) {
return "Fizz";
}
if (i % 5 == 0) {
return "Buzz";
}
return Integer.toString(i);
})
.map(i -> i + ", ");
observable.subscribe(System.out::println);
}
}
Observable#create の引数で要求される OnSubscribe は call(Subscriber sub) メソッドだけを Override すればよいので、Lambda 式で定義可能です。今回は単純に1から100までの整数値を次に送るだけのものを定義しています。
Observable.<Integer>create((sub) -> {
for (int i = 1; i <= 100; i++) {
sub.onNext(i);
}
sub.onCompleted();
})
onNext で発生したデータを次に送り、すべてのデータを送り終わったら onCompleted() を呼び出します。
subscribe で実行
定義した Observable オブジェクトは subscribe した時に初めて実行されます。subscribe は引数なしでも呼び出せますし、何かしらの操作をさせたいなら Action0 (RxJava の関数型インタフェイスの末尾についている数字は引数の数を示す)のインスタンスないし Lambda を渡すことができます。
observable.subscribe(System.out::println);
Observable オブジェクトは subscribe した時に初めて実行される
つまり、こういうことです。
// 前略
.map(i -> i + ", ");
System.out.println("ぬるぽぬるぽぬるぽ");// 追加
observable.subscribe(System.out::println);
ぬるぽぬるぽぬるぽ
1,
2,
Fizz,
4,
……後略……
forEach
subscribe メソッドを forEach に変えても同じことは可能です。
observable.forEach(System.out::println);
Observable 生成メソッドを分離
こんな感じで定義します。
private static Observable<Integer> makeObservable() {
return Observable.create((sub) -> {
for (int i = 1; i <= 100; i++) {
sub.onNext(i);
}
sub.onCompleted();
});
}
Observable#create の呼び出しの途中にジェネリクスを書かなくて済むのが利点です。
final Observable<String> observable = makeObservable()
.map(i -> {
まとめ
Observable を自分の実装したい処理に合わせて実装し直すことが RxJava では重要です。下記の3点を意識して実装していくとよいでしょう。
- 逐次発生するデータは何か?
- どんなデータを選択するのが適切か?
- 発生したデータに対しどう処理するか?
参考
Web
- RxJava(GitHub repository)
- RxJava Advent Calendar 2015
- 明日から使えるRxjava頻出パターン (Droid kaigi 2016) ……RxJava での非同期API処理に絞った解説です。
- 7つのサンプルプログラムで学ぶRxJavaの挙動
- 何となくRxJavaを使ってるけど正直よく分かってない人が読むと良さそうな記事・基礎編
- RxJavaの便利なOperator達
- RxJava学習のベストプラクティス
書籍
- WEB+DB PRESS vol.81「Javaの鉱脈」(p130-134)
(おまけ) Rx ファイルウォッチャー
Observable を自作するケース、実際にどのようなものになるのかを確認したかったので実装してみました。コード全体は Gist を参照してください。
makeFileWatcher()
private static Observable<Path> makeFileWatcher() {
return Observable.create((sub) -> {
Runtime.getRuntime().addShutdownHook(new Thread(() -> sub.onCompleted()));
while (true) {
System.out.println("Start check last modified.");
final File backup = new File("backup");
if (!backup.exists() || !backup.isDirectory()) {
System.out.println("make backup dir.");
backup.mkdir();
}
FILES
.entrySet().stream()
.filter(entry -> {
try {
final long ms = Files.getLastModifiedTime(entry.getKey()).toMillis();
return entry.getValue() < ms;
} catch (final Exception e) {
sub.onError(e);
}
return false;
})
.forEach(entry -> {
try {
final long ms = Files.getLastModifiedTime(entry.getKey()).toMillis();
FILES.put(entry.getKey(), ms);
} catch (final Exception e) {
sub.onError(e);
}
sub.onNext(entry.getKey());
});
try {
System.out.printf("Observable sleeping %dms\n", BACKUP_INTERVAL);
Thread.sleep(BACKUP_INTERVAL);
} catch (final InterruptedException e) {
sub.onError(e);
}
}
});
}
かいつまむと、filter で残った Path を forEach メソッドの Lambda 内で onNext により送っています。main メソッド側でそれらの Path に対する処理を記述します。
main method
Observable から送られてくる Path に対する処理を記述します。
また、今回はプログラムを停止させないために Sleep させました。GUI等であればそれらの処理は不要です。
makeFileWatcher()
.subscribeOn(Schedulers.newThread())
.subscribe(path -> {
System.out.println(LocalDateTime.now().toString() + " " + path.toString());
});
実行
Sleep 5000ms
Start check last modified.
Observable sleeping 5000ms
Sleep 5000ms
Start check last modified.
Observable sleeping 5000ms
Sleep 5000ms
Start check last modified.
2016-06-20T19:58:14.503 FileA.txt
Observable sleeping 5000ms
Sleep 5000ms