この記事は RxJava Advent Calendar 2016 の10目の記事です。
勢いでRxJava Advent Calendarに突っ込んだんですが、何書こうか迷ってて、
そういえば自分はにわかだな、と思ってOperatorsをもう一回勉強しようと思った次第です。
その1ってなってるのは多いので分割しようと思ったからで、全部で5回にわける予定です。
それでは1回目は ReactiveX Introduction に書いてある Creating Observables, Transforming Observables の2つについて見ていきます。
Creating
主に新しい Observable
を生み出すオペレーター達
Create
スクラッチで Observable
を作るオペレーターです。
Androidだと非同期通信なんかでよくみるやつですね。
使い方は以下のような感じです。
public class Create {
public static void main(String[] args) {
Observable.create(e -> {
Person person = new Person();
person.age = 100;
person.name = "nshiba";
e.onNext(person);
e.onComplete();
}).subscribe(System.out::println);
}
private static class Person {
int age;
String name;
@Override
public String toString() {
return name + ":" + String.valueOf(age);
}
}
}
出力
nshiba:100
onNext
で値を渡してあげて、終了時に onComplete
を呼ぶ感じです。
また、エラーの場合には onError
を呼びます。
Defer
defer
は実行させる Observable
を subscribe
したときに作成するオペレーターです。
通常の create
はその場で実行する Observable
を作成しますが、 defer
は Observable
の作成自体を遅延させます。
Observable observable = Observable.defer(() -> observer -> {
observer.onNext("test");
observer.onComplete();
});
// この瞬間にdeferの中で新しく Observable を作成することができる
observable.subscribe(System.out::println);
Empty/Never/Throw
これらのOperatorたちは限られた用途で主にテスト用として使われると思います。
Empty
何も値はないが正常に終了する Observable
をつくります。
つまり onComplete
しか呼ばれません。
Never
何も値がなく、終了もしない Observable
をつくります。
Throw
何も値はないが、指定したエラーをはいて終了する Observable
をつくります。
From
様々なオブジェクトをObservableに変換します。
おそらくリストを変換することが多いと思うのでサンプルは fromArray
で作ってみました。
int[] nums = new int[] {1, 2, 3, 4, 5};
Observable
.fromArray(nums)
.subscribe(ints -> {
System.out.println("onNext");
System.out.println(Arrays.toString(ints));
},
throwable -> {
System.out.println("onError");
},
() -> {
System.out.println("onComplete");
});
出力
onNext
[1, 2, 3, 4, 5]
onComplete
Interval
指定した一定間隔で整数の値を出力する Observable
を生成します。
最初にどれくらい遅延させるかの指定も可能です。
Observable
.interval(1, TimeUnit.SECONDS)
.subscribe(System.out::print);
出力
01234567789...
Just
直接引数に渡したオブジェクトで Observable
を生成します。
また、複数渡した場合はその分 onNext
が呼ばれ、複数渡す場合は型が統一されてなくてもエラーになりません。
Observable.just(3, 1, 5, 4, "test")
.subscribe(num -> {
System.out.println("onNext: " + num);
}, throwable -> {
System.out.println("onError");
}, () -> {
System.out.println("onComplete");
});
出力
onNext: 3
onNext: 1
onNext: 5
onNext: 4
onNext: test
onComplete
Range
指定した範囲の整数を出力する Observable
を生成します。
Observable.range(0, 10)
.subscribe(i -> {
System.out.println("onNext: " + i);
}, throwable -> {
System.out.println("onError");
}, () -> {
System.out.println("onComplete");
});
出力
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onComplete
Repeat
指定回数繰り返す Observable
を生成します。
Observable.just(1, 2, 3, 4, 5)
.repeat(3)
.subscribe(i -> {
System.out.println("onNext: " + i);
}, throwable -> {
System.out.println("onError");
}, () -> {
System.out.println("onComplete");
});
出力
onNext: 1
onNext: 2
onNext: 3
onNext: 1
onNext: 2
onNext: 3
onNext: 1
onNext: 2
onNext: 3
onComplete
Start
なんか計算した値などを返せるメソッドの戻り値を出力する Observable
を作成します。
Create
にちょっと似てる部分はありますが、こちらは戻り値が任意の値であって、 onNext
, onComplete
は呼ばないものです。
Observable.fromCallable(() -> {
String str = "java";
str += ":" + "RxJava";
return str;
}).subscribe(System.out::println);
出力
java:RxJava
Timer
指定した時間分の遅延後に値が出力する Observable
を作成します。
System.out.println(System.currentTimeMillis());
Observable.timer(3, TimeUnit.SECONDS)
.subscribe(aLong -> {
System.out.println(System.currentTimeMillis());
});
出力
1480975677330
1480975680651
Transforming
Buffer
指定した間隔でストリームを分割してリストを作成するオペレーター。
Observable.range(1, 5)
.buffer(3)
.subscribe(System.out::println);
出力
[1, 2, 3]
[4, 5]
FlatMap
ストリームに流れてきたものを処理してから、新しい Observable
に合成するオペレーター。
Observable.just(1, 2, 3)
.flatMap(i -> Observable.range(i, i * 2))
.subscribe(System.out::print);
出力
122345345678
GroupBy
ストリームを条件に従ってグループに分けるオペレーター。
同じグループにしたいものに同じ値を返すと同じグループになります。
Observable.range(1, 10)
.groupBy(integer -> integer % 3)
.subscribe(integerIntegerGroupedObservable -> {
integerIntegerGroupedObservable.toList().subscribe(System.out::println);
});
出力
[3, 6, 9]
[1, 4, 7, 10]
[2, 5, 8]
Map
ストリームに流れてきた値を変化させる事ができるオペレーター。
前述の FlatMap
との違いは、 FlatMap
は Observable
を返して、 Map
は値そのものを返します。
Observable.just(1,2,3)
.map(i -> i * 10)
.subscribe(System.out::println);
出力
10
20
30
Scan
リストに順次アクセスしていくOperator。2つずつアクセスしていく。
最初は1つ目と2つ目が引数に渡され、それ以降は前回の処理で戻り値に渡された値が第一引数、次の値が第2引数に渡される。
subscribe
時に渡される値は要素の1つ目 + 戻り値に渡された値。
Observable.range(1, 5)
.scan((sum, item) -> sum + item)
.subscribe(System.out::println);
出力
1
3
6
10
15
Window
指定した間隔でストリームを分割して、分割したストリームで新たな Observable
を作成するオペレーター。
前述の Buffer
と似ているが、 Buffer
は List<Integer>
を出力するのに対し、 Window
は Observable<Integer>
を出力する。
Observable.range(1,5)
.window(3)
.subscribe(integerObservable -> {
integerObservable.toList().subscribe(System.out::println);
});
出力
[1, 2, 3]
[4, 5]
最後に
こんな感じで他のもやっていけたらと思ってます。
ソースコードは公開しています -> nshiba/rx-samples
また、なにか間違っている点などありましたら、コメントやgithubのissueなどに書いてもらえると嬉しいです。