66
59

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Observable を実装して RxJava に入門し直す

Last updated at Posted at 2016-06-20

概要

「何か流行っているけどよくわからない」程度の認識だった RxJava について、先日『WEB+DB PRESS』 vol.81 の「Javaの鉱脈」を読んで、**「実装したい処理に合わせて Observable を適切に独自定義する」**ことが RxJava を考える上で重要だと気付きました。RxJava をどういう風に使うものなのか、自分なりの理解ができたので書いてみます。

実行環境

Java SE 8で動かします。もちろん Lambda 式を使います。ご了承ください。

Java SE 1.8.0_91
Eclipse Mars 4.5.2
RxJava 1.1.0

ソースコード

今回のソースコードは GitHub に置きました。
ソースコード(GitHub)


RxJava とは?

リアクティヴプログラミング(後述)を Java でやるためのライブラリです。Rx とは Reactive Extensions の略だそうです。なお、Java での Reactive Extensions 実装はほかに Reactor Core というものがあるそうです。詳しくは「Reactor Core 2.5: もう一つのJava向けReactive Extensions実装」をご覧ください。

そもそもリアクティヴプログラミングとは?

連続的なデータをイベントハンドラで処理していくプログラミングスタイル
『WEB+DB PRESS』 vol.81 「Javaの鉱脈」 より

???

で、Rx すると何がよいのか?

下記3つを宣言的に記述可能です。

  1. 逐次発生するデータの観測
  2. 適切なデータの選択
  3. 発生したデータに対する処理

自分の実装したい処理に RxJava が適しているのかを考える際は上記3点を用いるとよいでしょう。私が考え付いた例を挙げてみます。

(例) ファイルウォッチャー

Markdown エディタにて、ローカルの Markdown ファイル更新を検知→プレビュー用 WebView をリロード

項目
逐次発生するデータ 更新されたファイルの Path オブジェクト
適切なデータの選択 ファイルであること、コピーが可能であること
発生したデータに対する処理 ファイルをコピー、プレビューの更新

(例) クローラー

項目
逐次発生するデータ リクエストURL
適切なデータの選択 適切なURLであること、robots.txt でクロールが許可されていること
発生したデータに対する処理 URL のコンテンツを取得、DBへの格納

(例) RxKafka

メッセージキュー Kafka にデータが溜まったら逐次 Consume して何かしらの処理をさせます。

項目
逐次発生するデータ Kafka のメッセージ
適切なデータの選択 データ形式が正当
発生したデータに対する処理 データのパース等

なお Scala にはそのようなライブラリがすでにあるようです。

  1. rx-kafka
  2. kafka-rx

RxJava の API

rx.Observable

RxJava の核ともいえる部分です。この1クラスだけで1万行近く(空行・コメント含む)あります。 RxJava を使う上で必要なメソッドが定義されています。

Observable の static factory method

いくつかありますが、実際は create をほぼ使うことになると思います。

Method name Variable
from オブジェクトの配列
just 10個までのオブジェクト(可変長引数ではない)
range int のみ、range(1,10) で1から10までの要素を持つ Observable を生成
create OnSubscribe から Observable を生成

イベント処理用のオペレータメソッド

Java8 の Stream API でも見かける filter や map という名前は、高階関数(Wikipedia)のオペレータとして標準的な名前のようです。主なものを下記に挙げます。Observable を返すのでメソッドチェインでつなげて記述することが可能です。

Operator name Description
filter(Func1 super T, Boolean> predicate) 残す値を決定
map(Func1 super T, ? extends R> func) 元の値を変更して次に送る。型も変更可能
cache() Observable をキャッシュして再利用可能にする
retry(final long count) 処理を複数回試行させる。long で最大回数を指定可能

RxJava が開発された当時は Java の標準に関数型プログラミングをするためのクラスが揃っていなかったので、独自に実装してあります。

実行するスレッドを変更するメソッド

処理を実行する Scheduler(rx.Scheduler) を指定します。

Method name Description
observeOn(Scheduler scheduler) observer の実行スレッドを選択、実行後から変更
subscribeOn(Scheduler scheduler) operator の実行スレッドを選択/ 最初に指定したスレッドを使う/ 途中で指定しても最初のOperatorから指定したスレッドを使う

rx.Scheduler

処理を実行するスレッドの実体を保持するもので、非同期処理を書く際に重要となります。Schedulers クラスの static factory method から選択します。

Scheduler factory method Description
immediate() 現在のスレッドですぐに動かす
trampoline() 現在のスレッドに処理をプールして徐々に動かす
newThread() 都度新しいスレッドで動かす
computation() キャッシュされたスレッドで動かす。I/O処理以外で使う
io() キャッシュされたスレッドで動かす。I/O処理で使う
from(Executor executor) 指定された Executor で動かす
test() デバッグ用

rx.Observer

イベントの処理をするインタフェイスです。

Method name Description Times
void onNext(T t) 要素1つ1つの処理を記述 複数回
void onCompleted() 終了する際の処理を記述 1回のみ
void onError(Throwable e) 例外発生時の処理を記述 1回のみ

実際のプログラミングでは Observable を生成する際に適宜定義するため、これを直接実装することはほぼありません。


クイックスタート

フォルダを作って移動

$ mkdir vRxJava
$ cd vRxJava

プロジェクトをgradleで初期化

vRxJava> $ gradle init --type java-library
 :wrapper
 :init

生成された build.gradle に RxJava の依存を追加

build.gradle
apply plugin:'application'

mainClassName = 'RxFizzBuzz'

repositories {
    mavenCentral()
}
dependencies {
    compile 'io.reactivex:rxjava:1.1.0'
}

依存を解決

 vRxJava> gradle dependencies
 Download https://jcenter.bintray.com/io/reactivex/rxjava/1.1.0/rxjava-1.1.0.pom
 Download https://jcenter.bintray.com/io/reactivex/rxjava/1.1.0/rxjava-1.1.0.jar
 :dependencies
 
 ------------------------------------------------------------
 Root project
 ------------------------------------------------------------
 
 archives - Configuration for archive artifacts.
 No dependencies
 
 compile - Compile classpath for source set 'main'.
 \--- io.reactivex:rxjava:1.1.0
 
 default - Configuration for default artifacts.
 \--- io.reactivex:rxjava:1.1.0
 
 runtime - Runtime classpath for source set 'main'.
 \--- io.reactivex:rxjava:1.1.0

試しにFizzBuzz する

本当に RxJava を使っているだけで全然リアクティヴでもなんでもないコードです。

RxFizzBuzz.java
import rx.Observable;

public class RxFizzBuzz {
    public static final void main(final String[] args) {
        Observable.range(1, 100)
           .map(i -> {
               if (i % 15 == 0) {
                   return "FizzBuzz";
               }
               if (i % 3  == 0) {
                   return "Fizz";
               }
               if (i % 5  == 0) {
                   return "Buzz";
               }
               return Integer.toString(i);
           })
           .subscribe(
                   (i) -> {System.out.print(i + ", ");},
                   (e) -> e.printStackTrace(),
                   System.out::println
                   );
    }
}

実行

\>gradle run
:compileJava UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:run
1, 2, Fizz, 4, Buzz, Fizz, 7, 8, Fizz, Buzz, 11, Fizz, 13, 14, FizzBuzz, 16, 17, Fizz, 19, Buzz, Fizz, 22, 23, Fizz, Buzz, 26, Fizz, 28, 29, FizzBuzz, 31, 32, Fizz, 34, Buzz, Fizz, 37, 38, Fizz, Buzz, 41, Fizz, 43, 44, FizzBuzz, 46, 47, Fizz, 49, Buzz, Fizz, 52, 53, Fizz, Buzz, 56, Fizz, 58, 59, FizzBuzz, 61, 62, Fizz, 64, Buzz, Fizz, 67, 68, Fizz, Buzz, 71, Fizz, 73, 74, FizzBuzz, 76, 77, Fizz, 79, Buzz, Fizz, 82, 83, Fizz, Buzz, 86, Fizz, 88, 89, FizzBuzz, 91, 92, Fizz, 94, Buzz, Fizz, 97, 98, Fizz, Buzz, 

何の変哲もない FizzBuzz が表示されました。

処理の流れ

Observable でデータソースの指定

Observable.range で 1 から 100 までの int 値を要素に持つ Observable オブジェクトを作ります。

Operator でデータ操作

Java8 の Stream API と同じ名前のメソッドと同じ操作ができるような感じを受けます。もちろん Lambda で記述可能です。

Observable#subscribe で処理を決定&実行

Observable#subscribe で onNext/onError/onComplete を実装します。このメソッドを呼び出すことにより、初めて処理が実行されます。


Observable を自作する

RxJava は登場してからすでに数年経過しているライブラリなので、資料も大量に存在します。ただ、入門向けの記事だと Observable の just や from や range を使った簡単なサンプルが多く(入門記事なので当たり前と言えばそうですが)、それらで作成できる Observable は作成時にすべてのデータがそろっていなければいけません。固定値しか扱えない Observable では、逐次データが発生するケースで力を発揮する RxJava の本領からは程遠いものになってしまいます。

各自のアプリケーション開発に RxJava を取り入れるには、独自の Observable を定義することが必須のようです。というより、独自定義の Observable を使用しないなら、 RxJava はちょっと変な Stream API 程度のものでしかなくて、それをわかっていないとこのような記事を書いてしまいます。

手順

  1. rx.Subscriber を引数に持つコールバックメソッドを定義
  2. 定期的に isUnsubscribed を呼び出して終了を確認
  3. データを1つずつ onNext
  4. 処理の流れが正常に完了したなら onComplete()
  5. 処理の流れが途中で異常終了したなら onError()

実装例

FizzBuzz 用の Observable を置き換えるものを書いてみます。

コード全体

OnSubscribeImplementation.java
import rx.Observable;

public class OnSubscribeImplementation {
    public static final void main(final String[] args) {
        final Observable<String> observable = Observable.<Integer>create((sub) -> {
            for (int i = 1; i <= 100; i++) {
                sub.onNext(i);
            }
            sub.onCompleted();
        })
        .map(i -> {
            if (i % 15 == 0) {
                return "FizzBuzz";
            }
            if (i % 3  == 0) {
                return "Fizz";
            }
            if (i % 5  == 0) {
                return "Buzz";
            }
            return Integer.toString(i);
        })
        .map(i -> i + ", ");
        observable.subscribe(System.out::println);
    }
}

Observable#create の引数で要求される OnSubscribe は call(Subscriber sub) メソッドだけを Override すればよいので、Lambda 式で定義可能です。今回は単純に1から100までの整数値を次に送るだけのものを定義しています。

Observable.<Integer>create((sub) -> {
    for (int i = 1; i <= 100; i++) {
        sub.onNext(i);
    }
    sub.onCompleted();
})

onNext で発生したデータを次に送り、すべてのデータを送り終わったら onCompleted() を呼び出します。

subscribe で実行

定義した Observable オブジェクトは subscribe した時に初めて実行されます。subscribe は引数なしでも呼び出せますし、何かしらの操作をさせたいなら Action0 (RxJava の関数型インタフェイスの末尾についている数字は引数の数を示す)のインスタンスないし Lambda を渡すことができます。

observable.subscribe(System.out::println);

Observable オブジェクトは subscribe した時に初めて実行される

つまり、こういうことです。

// 前略
.map(i -> i + ", ");
System.out.println("ぬるぽぬるぽぬるぽ");// 追加
observable.subscribe(System.out::println);
実行結果(一部)
ぬるぽぬるぽぬるぽ
1, 
2, 
Fizz, 
4, 
……後略……

forEach

subscribe メソッドを forEach に変えても同じことは可能です。

observable.forEach(System.out::println);

Observable 生成メソッドを分離

こんな感じで定義します。

makeObservable()
private static Observable<Integer> makeObservable() {
    return Observable.create((sub) -> {
        for (int i = 1; i <= 100; i++) {
            sub.onNext(i);
        }
        sub.onCompleted();
    });
}

Observable#create の呼び出しの途中にジェネリクスを書かなくて済むのが利点です。

置き換え
final Observable<String> observable = makeObservable()
        .map(i -> {

まとめ

Observable を自分の実装したい処理に合わせて実装し直すことが RxJava では重要です。下記の3点を意識して実装していくとよいでしょう。

  1. 逐次発生するデータは何か?
  2. どんなデータを選択するのが適切か?
  3. 発生したデータに対しどう処理するか?

参考

Web

  1. RxJava(GitHub repository)
  2. RxJava Advent Calendar 2015
  3. 明日から使えるRxjava頻出パターン (Droid kaigi 2016) ……RxJava での非同期API処理に絞った解説です。
  4. 7つのサンプルプログラムで学ぶRxJavaの挙動
  5. 何となくRxJavaを使ってるけど正直よく分かってない人が読むと良さそうな記事・基礎編
  6. RxJavaの便利なOperator達
  7. RxJava学習のベストプラクティス

書籍

  1. WEB+DB PRESS vol.81「Javaの鉱脈」(p130-134)

(おまけ) Rx ファイルウォッチャー

Observable を自作するケース、実際にどのようなものになるのかを確認したかったので実装してみました。コード全体は Gist を参照してください。

makeFileWatcher()

makeFileWatcher()
private static Observable<Path> makeFileWatcher() {
        return Observable.create((sub) -> {
            Runtime.getRuntime().addShutdownHook(new Thread(() -> sub.onCompleted()));
            while (true) {
                System.out.println("Start check last modified.");
                final File backup = new File("backup");
                if (!backup.exists() || !backup.isDirectory()) {
                    System.out.println("make backup dir.");
                    backup.mkdir();
                }
                FILES
                    .entrySet().stream()
                    .filter(entry -> {
                        try {
                            final long ms = Files.getLastModifiedTime(entry.getKey()).toMillis();
                            return entry.getValue() < ms;
                        } catch (final Exception e) {
                            sub.onError(e);
                        }
                        return false;
                    })
                    .forEach(entry -> {
                        try {
                            final long ms = Files.getLastModifiedTime(entry.getKey()).toMillis();
                            FILES.put(entry.getKey(), ms);
                        } catch (final Exception e) {
                            sub.onError(e);
                        }
                        sub.onNext(entry.getKey());
                    });
                try {
                    System.out.printf("Observable sleeping %dms\n", BACKUP_INTERVAL);
                    Thread.sleep(BACKUP_INTERVAL);
                } catch (final InterruptedException e) {
                    sub.onError(e);
                }
            }
        });
}

かいつまむと、filter で残った Path を forEach メソッドの Lambda 内で onNext により送っています。main メソッド側でそれらの Path に対する処理を記述します。

main method

Observable から送られてくる Path に対する処理を記述します。

また、今回はプログラムを停止させないために Sleep させました。GUI等であればそれらの処理は不要です。

use_observable
makeFileWatcher()
    .subscribeOn(Schedulers.newThread())
    .subscribe(path -> {
            System.out.println(LocalDateTime.now().toString() + " " + path.toString());
        });

実行

出力
Sleep 5000ms
Start check last modified.
Observable sleeping 5000ms
Sleep 5000ms
Start check last modified.
Observable sleeping 5000ms
Sleep 5000ms
Start check last modified.
2016-06-20T19:58:14.503 FileA.txt
Observable sleeping 5000ms
Sleep 5000ms
66
59
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
66
59

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?