LoginSignup
359
342

More than 1 year has passed since last update.

非同期や並列処理にも役立つRxJavaの使い方

Last updated at Posted at 2015-10-25

なぜRxJava?

RxJavaは様々な特性を併せ持ったライブラリですが、簡単にまとめると以下のような機能に分類されます。

  • List処理の抽象化・ストリーム化
  • Optional
  • Future/Promise
  • Data Binding
  • Event Bus

Android開発でRxJavaをチームに導入した話

Java 8ではStream APIやOptionalが導入されていますが、Androidや業務要件などそのAPIを使えない環境も存在します。
また、非同期や並列などそもそもJavaで扱いにくい処理を、統一されたインターフェイスで簡潔に記述できるなどのメリットも多く、その基本的な機能を試してみたので紹介します。

なお、ここに記載する内容はRxJavaの使い方が中心で、RxJava自体の概念やFRPなどについては、他にも多くの方々紹介してくださっていますので、そちらをご覧いただければと思います。

また、今回のサンプルコードは単純化のためにfromやjust,rangeなど生成済みのストリームを使用していますが、本来Rxは時間的に変化するストリームも扱うことができるので、実運用で使用するときにはその辺りも考慮して使用する必要があります。

Use Case

細かな機能からだと、メリットがわかりにくくなるので、まずはいくつかのユースケースを紹介したいと思います。

Stream APIのようにCollectionを操作する

感覚的に理解しやすい同期的にCollectionの操作を行う例。

List<Integer> res = Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6)) // IterableなオブジェクトからObservableを生成
        .flatMap(Observable::just)        // Stream APIのflatMap相当
        .filter(i -> i % 2 == 0)          // Stream APIのfilter相当
        .map(i -> i * 2)                  // Stream APIのmap相当
        .skip(1)                          // Stream APIのskip相当
        .take(3)                          // Stream APIのlimit相当 ※同様の処理のlimit()も存在する
        .toList()                         // ストリームをListに変換
        .toBlocking()                     // 同期処理で行う
        .single();                        // 要素を取得

System.out.println(res); // [8, 12]

ファクトリとなるfrom()と同期的にListの変換を行うtoList().toBlocking().single()部分の記述に違いはありますが、ほとんどJava 8のStream APIと同じような感覚で操作が出来ます。

Optionalのように扱う

RxJavaの中心となっているObservableはOptionalに似た性質も持っているので、それを利用してOptionalのように扱うことも出来ます。
そのまま記述すると冗長になるので、今回はObservablesというユーティリティを作成しています。

@Test
public void Optionalのように扱う() throws Exception {

    Observable<String> exist = Observables.ofNullable("val");
    Observable<Object> empty = Observables.ofNullable(null);

    // Optional#ifPresent
    exist.subscribe(System.out::println); // val
    empty.subscribe(System.out::println); // 表示処理自体も呼ばれない

    // Optional#flatMap, map, filter
    Observables.ofNullable("1,2,3,4")
            .flatMap(str-> Observable.from(asList(str.split(","))))
            .map(Integer::parseInt)
            .filter(i -> i % 2 == 0)
            .subscribe(System.out::println);

    // Optional#get
    Observables.get(exist); // val
    Observables.get(empty); // throw NoSuchElementException

    // Optional#orElse
    Observables.orElse(exist, "default"); // val
    Observables.orElse(empty, "default"); // default

    // Optional#isPresent
    Observables.isPresent(exist); // true
    Observables.isPresent(empty); // false
}

class Observables {
    static <T> Observable<T> of(T value){
        if(value == null){
            throw new NullPointerException();
        }
        else{
            return Observable.just(value);
        }
    }

    static <T> Observable<T> ofNullable(T value){
        return value == null ? Observable.empty() : Observable.just(value);
    }

    static <T> T get(Observable<T> observable){
        return observable.toBlocking().single();
    }

    static <T> T orElse(Observable<T> observable, T other){
        return observable.defaultIfEmpty(other).toBlocking().single();
    }

    static <T> boolean isPresent(Observable<T> observable){
        return !observable.isEmpty().toBlocking().single();
    }

    static <T> T orElseGet(Observable<T> observable, Func0<T> other) {
        return isPresent(observable) ? get(observable) : other.call();
    }

    static <T, X extends Throwable> T orElseThrow(Observable<T> observable, Func0<? extends X> other) throws X {
        if (isPresent(observable)) {
            return get(observable);
        } else {
            throw other.call();
        }
    }
}

Future/Promiseのように扱う

Observableは実行するスレッドを切り替えるためのメソッドを用意しているので、Web APIやDBアクセスなど重い処理を非同期化し合成することができます。
ObservableとFuture/Promiseとの違いは、Request/Responseのような一つの要素だけではなくイベントのような複数発生する要素をを扱えることです。
今回はFuture/Promiseと同様にObservableに単一の要素を追加して扱います。

@Test
public void FuturePromiseのように扱う() throws Exception {

    // 直列+並列に処理する例
    // 最初にユーザの認証情報を取得し、その認証情報を元に並列にユーザ情報を取得する例
    Stopwatch sw = Stopwatch.createStarted();
    getAuthId("id", "pass")
            .flatMap(id -> Observable.zip(getUserInfo(id), getUserItems(id),
                    (info, items) -> info + " has items = " + items))
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("All time: " + sw.stop());
                }
                @Override
                public void onError(Throwable e) {
                }
                @Override
                public void onNext(String s) {
                    System.out.println(s);
                }
            });

    // Outpuot:
    //
    // getAuthId, ThreadName: RxCachedThreadScheduler-1, Params:id,pass
    // getUserInfo, ThreadName: RxCachedThreadScheduler-2, Params: 123
    // getUserItems, ThreadName: RxCachedThreadScheduler-3, Params: 123
    // Tom has items = [Apple, Banana]
    // All time: 4.191 s
}

Observable<Integer> getAuthId(String id, String pass) {
    return Observable.create((Subscriber<? super Integer> subscriber) -> {
        // 本来以下はWebやDBアクセスなど
        try {
            System.out.println(String.format("getAuthId, ThreadName: %s, Params:%s,%s", Thread.currentThread().getName(), id, pass));
            Threads.sleep(1000);
            subscriber.onNext(123);
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

Observable<String> getUserInfo(Integer authId) {
    return Observable.create((Subscriber<? super String> subscriber) -> {
        // 本来以下はWebやDBアクセスなど
        try {
            System.out.println(String.format("getUserInfo, ThreadName: %s, Params: %d", Thread.currentThread().getName(), authId));
            Threads.sleep(2000);
            subscriber.onNext("Tom");
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

Observable<List<String>> getUserItems(Integer authId) {
    return Observable.create((Subscriber<? super List<String>> subscriber) -> {
        // 本来以下はWebやDBアクセスなど
        try {
            System.out.println(String.format("getUserItems, ThreadName: %s, Params: %d", Thread.currentThread().getName(), authId));
            Threads.sleep(3000);
            subscriber.onNext(asList("Apple", "Banana"));
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

各メソッドの処理時間が1+2+3なので6秒かかるはずの処理が並列に呼び出しを行っているので4秒強で完了します。
またこういった非同期、並列処理をHttp経由で行う際には、Retrofitと組み合わせることでより簡潔に記述できます。

クライアントサイドだと更に、Data BindingやEvent Busなどの機能も有効に活用できると思います。

RxJavaのAPI

RxJavaには非常に豊富なAPIが用意されているので、ここからはそのAPIをいくつか紹介します。

Observableの生成

RxJavaには多くのファクトリが用意されており、これらを使用することで様々なデータからObservableを生成することができます。
Creating Observables

from

IterableなオブジェクトからObservableを生成。
その他にも配列やFutureなど多くの引数からObservableを生成するオーバーロードが存在する。

Observable<List<Integer>> o = Observable.from(list);

create

OnSubscribeを指定してObservableを生成。

Observable<String> o = Observable.create(subscriber -> {
		    subscriber.onNext("Hello");
		    subscriber.onNext("world!");
		    subscriber.onCompleted();
		});

range

要素の範囲を指定してObservableを生成。

Observable<Integer> o = Observable.range(1, 5);

just

引数で直接Observableを生成

Observable<Integer> o = Observable.just(1, 2, 3);

error

例外を持ったObservableを生成

Observable<Object> o = Observable.error(new RuntimeException("Error!"));

empty

空のObservableを生成

Observable<Object> o = Observable.empty();

interval

非同期で指定間隔ごとにsubscribeする。
toBlockingなどしてもブロックされない。
takeなどで回数制限を行うことができる。

Observable
        .interval(1, TimeUnit.SECONDS)
        .subscribe(System.out::print);

// 12345...

Observableの操作

生成したObservableには様々な操作を行うことができます。
StreamAPIのようにCollectionを操作するで記述した、map、flatMap、fileter、skip、takeなどもその一部です。
ここではその中でよく使いそうなものを紹介します。

map

ストリームに流れてくるアイテムを変換する。

Observable.range(1, 5)
        .map(i -> i * 2)
        .subscribe(System.out::print);

// 246810

flatMap

ストリームに流れてくるアイテムを変換し、合成する。

Observable.just(1, 2)
        .flatMap(i -> Observable.range(i, 3))
        .subscribe(System.out::print);

// 123234345

filter

条件に一致したアイテムだけを抽出する。

Observable.range(1, 5)
        .filter(i -> i % 2 == 0)
        .subscribe(System.out::print);

// 24

skip

指定した数のアイテムをスキップする。

Observable.range(1, 5)
        .skip(3)
        .subscribe(System.out::print);

// 345

take

先頭から指定された数のアイテムを抽出する。
同様の処理のlimitも存在する。

Observable.range(1, 5)
        .take(3)
        .subscribe(System.out::print);

// 123

reduce

アイテムの集約を行う。

Observable.range(1, 3)
        .reduce((sum, value) -> sum + value)
        .subscribe(System.out::print);

// 6

scan

前回のアイテムも引数に取るmap。畳み込まないreduce。

Observable.range(1, 3)
        .scan((sum, value) -> sum + value)
        .subscribe(System.out::print);

// 136

collect

ストリームのアイテムを集約する。

Observable.range(1, 5).collect(new ArrayList<>(), (li, i) -> li.add(i));
        .subscribe(System.out::println);

// [1, 2, 3, 4, 5]

merge

複数のObservableを合成。flatMapを使う必要がない。

Observable.merge(Observable.from(list1), Observable.from(list2))

concat

複数のObservableを合成。
mergeとconcatの違いはmergeはObservableを合成する際に全てのストリームの順を考慮して合成するのに対し、concatは引数で渡されたObservableの順で合成すること。

Observable.concat(Observable.from(list1), Observable.from(list2));

zip

渡されたストリームのアイテムが全て揃った時点で呼び出される。
mergeやconcatとの違いは、異なる型のObservableを扱えること。

Observable<Integer> o1 = Observable.just(1, 3, 5);
Observable<Integer> o2 = Observable.just(2, 4, 6);
Observable
        .zip(o1, o2, (d1, d2) -> d1 + " + " + d2 + " = " + (d1 + d2))
        .subscribe(System.out::println);

// 2 + 1 = 3
// 4 + 3 = 7
// 6 + 5 = 11

combineLatest

zip同様に異なる型のObservableを扱うが、各アイテムのonNextが呼ばれた時点でもっとも近いストリームのアイテムを扱う。

groupBy

Observableのグルーピングを行う。

Observable.range(1, 5)
        .groupBy(i -> i % 2)
        .subscribe(grouped -> {
            grouped.toList().subscribe(list -> System.out.println(format("key=%d,value=%s", grouped.getKey(), list)));
        });

// key=0,value=[2, 4]
// key=1,value=[1, 3, 5]

distinct

重複を排除する。
また重複方法を選択する distinct(Func1 keySelector) も存在する。

Observable.from(asList(1, 2, 3, 3, 4, 5, 5))
        .distinct()
        .subscribe(System.out::print);

// 12345

window

ストリームを束ねて、新たなストリームを生成する。
束ね方により複数のオーバーロードが存在する。

Observable.range(1, 5)
        .window(2)
        .subscribe(i -> i.toList().subscribe(System.out::print));

// [1, 2][3, 4][5]

buffer

指定した間隔でストリームを分割したObservableのListを生成する。
数値の他に時間や関数など複数の引数をとるオーバーロードが存在する。

Observable.range(1, 5)
        .buffer(2)
        .subscribe(System.out::print);

// [1, 2][3, 4][5]

repeat

onCompletedされたときに再度subscribeするオペレータ。
repeat処理を指定する、repeatWhenも存在する。

Random r = new Random();
Observable.create(s -> {
            s.onNext(r.nextInt());
            s.onCompleted();
        })
        .repeat(3)
        .subscribe(System.out::println);

// 1355134043
// 1691330796
// 1863470304

timeout

タイムアウトを設定し、タイムアウトした場合にはTimeoutExceptionが発生する

Observable.interval(3, TimeUnit.SECONDS)
        .timeout(1, TimeUnit.SECONDS)
        .onErrorReturn(e -> {
            System.out.println(e);
            return -1L;
        })
        .subscribe(System.out::println);

// java.util.concurrent.TimeoutException
// -1

非同期・マルチスレッド・Scheduler

非同期、並列処理などでスレッドの切り替えを行う際には、Schedulerを利用します。
ここでは標準で用意されているSchedulerとその使い方について記載します。
Scheduler

標準で用意されているSchedulers

Scheduler 動作
computation CPUバウンド用スレッドを生成する。データの加工など向け。コア数と同じ数のキャッシュされたスレッドを使ってイベント処理をする。I/O処理ってはいけない。
io I/Oバウンド用のスレッドを生成する。DBアクセスなど向け。1つだけキャッシュされているスレッドを使ってイベント処理を行う。
trampoline ThredLocalに処理をキューイングし現在実行中の処理が完了したら逐次実行する。
immediate 現在のスレッドを使ってイベントを処理する。処理を遅延することはできるがキューイングすることはできない。
newThread 作業単位ごとに新しいスレッドを作ってイベントを処理する。

subscribeOn

RxJavaの一連の処理を実行するスレッドを指定する。

Observable.range(1, 5)
        .subscribeOn(Schedulers.newThread())
        .subscribe(i -> System.out.print(Thread.currentThread().getName()));

// RxNewThreadScheduler-1
// RxNewThreadScheduler-1
// RxNewThreadScheduler-1
// RxNewThreadScheduler-1
// RxNewThreadScheduler-1

observeOn

Observerの実行するスレッドを指定する。

Observable.range(1, 3)
        .subscribeOn(Schedulers.newThread())
        .map(i -> {
            System.out.println("map: (" + Thread.currentThread().getName() + ")");
            return i * 2;
        })
        .observeOn(Schedulers.computation())
        .subscribe(i -> System.out.println("subscribe: " + i + " (" + Thread.currentThread().getName() + ")"));

// map: (RxNewThreadScheduler-1)
// map: (RxNewThreadScheduler-1)
// map: (RxNewThreadScheduler-1)
// subscribe: 2 (RxComputationThreadPool-3)
// subscribe: 4 (RxComputationThreadPool-3)
// subscribe: 6 (RxComputationThreadPool-3)

エラーハンドリング

RxJavaのメリットとして、通常処理単位で行わなければならないようなストリームの中の処理に対して、統一した記法でエラーハンドリングを行うことが出来ます。

onErrorReturn

例外が発生時の処理を行う。
同じ型のObservableを返り値とするonErrorResumeNextも存在する。

Observable.range(1, 5)
        .doOnNext(i -> {
            if (i == 2) {
                throw new RuntimeException("Error!!");
            }
        })
        .onErrorReturn(e -> {
            System.out.println(e.getMessage());
            return -1;
        })
        .subscribe(System.out::println);

// 1
// Error!!
// -1

retry

例外が発生しても指定の回数リトライを行う。
リトライ処理を細かく指定できるObservable#retryWhenも存在する。

Observable.range(1, 5)
        .doOnNext(i -> {
            if (new Random().nextInt(3) == 1) {
                throw new RuntimeException("Error!");
            }
        })
        .retry(1)
        .onErrorReturn(e -> {
            System.out.println(e);
            return -1;
        })
        .subscribe(System.out::println);

// 1
// 2
// 1
// 2
// 3
// java.lang.RuntimeException: Error!
// -1

まとめ

ここで紹介したAPIはほんの一部で、それ以外にもHot/Coldの性質やData Binding、Event Bus、Reactive Streams対応など多くの機能が存在するので、その辺りはまた時間が取れた時に追加していこうと思います。

参考

RxJava GitHub
RxJava Javadoc
Web+DB Press val.81 Javaの鉱脈
RxのHotとColdについて
Mastering observables
【翻訳】あなたが求めていたリアクティブプログラミング入門
RxJava Schedulers Use Cases
Rx逆引き

359
342
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
359
342