LoginSignup
10
14

More than 5 years have passed since last update.

RxJavaでOperators再入門 その1

Posted at

この記事は RxJava Advent Calendar 2016 の10目の記事です。

勢いでRxJava Advent Calendarに突っ込んだんですが、何書こうか迷ってて、
そういえば自分はにわかだな、と思ってOperatorsをもう一回勉強しようと思った次第です。

その1ってなってるのは多いので分割しようと思ったからで、全部で5回にわける予定です。

それでは1回目は ReactiveX Introduction に書いてある Creating Observables, Transforming Observables の2つについて見ていきます。

Creating

主に新しい Observable を生み出すオペレーター達

Create

スクラッチで Observable を作るオペレーターです。
Androidだと非同期通信なんかでよくみるやつですね。

使い方は以下のような感じです。

public class Create {
    public static void main(String[] args) {
        Observable.create(e -> {
            Person person = new Person();
            person.age = 100;
            person.name = "nshiba";
            e.onNext(person);
            e.onComplete();
        }).subscribe(System.out::println);
    }

    private static class Person {
        int age;
        String name;

        @Override
        public String toString() {
            return name + ":" + String.valueOf(age);
        }
    }
}

出力

nshiba:100

onNext で値を渡してあげて、終了時に onComplete を呼ぶ感じです。

また、エラーの場合には onError を呼びます。

Defer

defer は実行させる Observablesubscribe したときに作成するオペレーターです。

通常の create はその場で実行する Observable を作成しますが、 deferObservable の作成自体を遅延させます。

Observable observable = Observable.defer(() -> observer -> {
    observer.onNext("test");
    observer.onComplete();
});

// この瞬間にdeferの中で新しく Observable を作成することができる
observable.subscribe(System.out::println);

Empty/Never/Throw

これらのOperatorたちは限られた用途で主にテスト用として使われると思います。

Empty

何も値はないが正常に終了する Observable をつくります。
つまり onComplete しか呼ばれません。

Never

何も値がなく、終了もしない Observable をつくります。

Throw

何も値はないが、指定したエラーをはいて終了する Observable をつくります。

From

様々なオブジェクトをObservableに変換します。
おそらくリストを変換することが多いと思うのでサンプルは fromArray で作ってみました。

int[] nums = new int[] {1, 2, 3, 4, 5};
Observable
        .fromArray(nums)
        .subscribe(ints -> {
            System.out.println("onNext");
            System.out.println(Arrays.toString(ints));
        },
        throwable -> {
            System.out.println("onError");
        },
        () -> {
            System.out.println("onComplete");
        });

出力

onNext
[1, 2, 3, 4, 5]
onComplete

Interval

指定した一定間隔で整数の値を出力する Observable を生成します。
最初にどれくらい遅延させるかの指定も可能です。

Observable
        .interval(1, TimeUnit.SECONDS)
        .subscribe(System.out::print);

出力

01234567789...

Just

直接引数に渡したオブジェクトで Observable を生成します。
また、複数渡した場合はその分 onNext が呼ばれ、複数渡す場合は型が統一されてなくてもエラーになりません。

Observable.just(3, 1, 5, 4, "test")
        .subscribe(num -> {
            System.out.println("onNext: " + num);
        }, throwable -> {
            System.out.println("onError");
        }, () -> {
            System.out.println("onComplete");
        });

出力

onNext: 3
onNext: 1
onNext: 5
onNext: 4
onNext: test
onComplete

Range

指定した範囲の整数を出力する Observable を生成します。

Observable.range(0, 10)
        .subscribe(i -> {
            System.out.println("onNext: " + i);
        }, throwable -> {
            System.out.println("onError");
        }, () -> {
            System.out.println("onComplete");
        });

出力

onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onComplete

Repeat

指定回数繰り返す Observable を生成します。

Observable.just(1, 2, 3, 4, 5)
        .repeat(3)
        .subscribe(i -> {
            System.out.println("onNext: " + i);
        }, throwable -> {
            System.out.println("onError");
        }, () -> {
            System.out.println("onComplete");
        });

出力

onNext: 1
onNext: 2
onNext: 3
onNext: 1
onNext: 2
onNext: 3
onNext: 1
onNext: 2
onNext: 3
onComplete

Start

なんか計算した値などを返せるメソッドの戻り値を出力する Observable を作成します。
Create にちょっと似てる部分はありますが、こちらは戻り値が任意の値であって、 onNext , onComplete は呼ばないものです。

Observable.fromCallable(() -> {
    String str = "java";
    str += ":" + "RxJava";
    return str;
}).subscribe(System.out::println);

出力

java:RxJava

Timer

指定した時間分の遅延後に値が出力する Observable を作成します。

System.out.println(System.currentTimeMillis());
Observable.timer(3, TimeUnit.SECONDS)
        .subscribe(aLong -> {
            System.out.println(System.currentTimeMillis());
        });

出力

1480975677330
1480975680651

Transforming

Buffer

指定した間隔でストリームを分割してリストを作成するオペレーター。

Observable.range(1, 5)
        .buffer(3)
        .subscribe(System.out::println);

出力

[1, 2, 3]
[4, 5]

FlatMap

ストリームに流れてきたものを処理してから、新しい Observable に合成するオペレーター。

Observable.just(1, 2, 3)
        .flatMap(i -> Observable.range(i, i * 2))
        .subscribe(System.out::print);

出力

122345345678

GroupBy

ストリームを条件に従ってグループに分けるオペレーター。
同じグループにしたいものに同じ値を返すと同じグループになります。

Observable.range(1, 10)
        .groupBy(integer -> integer % 3)
        .subscribe(integerIntegerGroupedObservable -> {
            integerIntegerGroupedObservable.toList().subscribe(System.out::println);
        });

出力

[3, 6, 9]
[1, 4, 7, 10]
[2, 5, 8]

Map

ストリームに流れてきた値を変化させる事ができるオペレーター。
前述の FlatMap との違いは、 FlatMapObservable を返して、 Map は値そのものを返します。

Observable.just(1,2,3)
        .map(i -> i * 10)
        .subscribe(System.out::println);

出力

10
20
30

Scan

リストに順次アクセスしていくOperator。2つずつアクセスしていく。
最初は1つ目と2つ目が引数に渡され、それ以降は前回の処理で戻り値に渡された値が第一引数、次の値が第2引数に渡される。
subscribe 時に渡される値は要素の1つ目 + 戻り値に渡された値。

Observable.range(1, 5)
        .scan((sum, item) -> sum + item)
        .subscribe(System.out::println);

出力

1
3
6
10
15

Window

指定した間隔でストリームを分割して、分割したストリームで新たな Observable を作成するオペレーター。
前述の Buffer と似ているが、 BufferList<Integer> を出力するのに対し、 WindowObservable<Integer> を出力する。

Observable.range(1,5)
        .window(3)
        .subscribe(integerObservable -> {
            integerObservable.toList().subscribe(System.out::println);
        });

出力

[1, 2, 3]
[4, 5]

最後に

こんな感じで他のもやっていけたらと思ってます。
ソースコードは公開しています -> nshiba/rx-samples

また、なにか間違っている点などありましたら、コメントやgithubのissueなどに書いてもらえると嬉しいです。

10
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
14