よく使うやつから絶対つかわねーだろみたいなやつ書きました。
たまーに見返して毎日をゆたかにしましょう。
もしよければこっちに基本的な使い方が書いてあるのでどうぞ
生成
create
[ReactiveX - Create operator][create]
別ObservableのsubscribeによってObservalbeを生成するオペレーター。
意味が分かれば図の通りなんだけど意味わかんないと思うのでサンプルコードを。
ではどういったときに使うかというと非同期処理等でエラーハンドリングが必要な際に使います。
では具体例を。
Observable<String> observable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
FileInputStream fileInputStream;
fileInputStream = openFileInput("MyFile.txt");
byte[] readBytes = new byte[fileInputStream.available()];
fileInputStream.read(readBytes);
subscriber.onNext(new String(readBytes));
fileInputStream = openFileInput("2ndMyFile.txt");
byte[] readBytes2nd = new byte[fileInputStream.available()];
fileInputStream.read(readBytes2nd);
subscriber.onNext(new String(readBytes2nd));
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
}
こんな使い方するかどうかは別としてcreateを使うときは例外処理が発生するときだと思っています。
自分でsubscribe発火を制御する感じです。
公式サンプルではfor文でObservableを作っていますね。
Iteraorを持っていない連続したデータをObservableにしたいときにも使うでしょう。
just
Observableにする値を直接指定します。
[ReactiveX - Just operator][just]
画像では一つしかObservableに流れていませんが10個まで指定できます。
Observable.just(1,2,3,4,5,6,7,8,9)
テストとかに使うんですかね。
repaet
onCompletedを通った際もう一度Suscribeをしてくれます。文面じゃ理解しにくいので図とコードをさっそく見てください。
赤緑青をObservableに流します。データとしては横の棒で終了しますが、repeatOperatorを通ったあとは同じデータがsubscribeされ続けます。
まずは
Observable.just(1,2,3,4,5,6,7,8,9)
.repeat(5)
回数の指定。そりゃできます。
repeatWhen
repeatのお友達でリピートする間隔を指定します。
データは黄色、赤の〇。それが棒で終了したら青いものが発行されてdelay時間を挟んでからもう一度subscribeします。
Observable.just(1,2,3,4,5,6,7,8,9)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
return observable.delay(5, TimeUnit.DAYS);
}
});
}
詳しくはこの記事を参照してみてください。私も非常に勉強になりました。
Empty/Never/Throw
特殊な三人組です。
EmptyはOnCompleteだけ飛んできます。
Neverはそれすらも呼ばれない空のObservableです。
ThrowはonErrorを飛ばすObservableを流します。
一応発火のイメージを把握するために画像も見てましょう。
最後に発火しています。こちらがOnCompletedです。
ごらんのとおり発火すらしません。
Errorの発火をしています。
from
IteraotrをもつオブジェクトからObservableを生成します。
あり得ないほどわかりやすいと思います。
Arrayだったら大体そうでしょう。
String[] array = new String[]{"sasaki","ササキ","ささき","佐々木"};
Observable.from(array)
ちなみにHashMapからデータを取る際は、
Observable.from(map.entrySet())
といった具合です。
range
intで始まりと終わりを指定してその間の数字のObservableを生成します。
めちょ簡単です。
Observable.range(1,10);
フィルタリング
作ったでーたをフィルタリングするものです。
流れてきたデータに対してonNextを行わせるかどうか、という処理を主にするオペレーターです。
filter
文字通りフィルタリングをしてくれます。
コレクション操作でもかなり使うポピュラーなオペレーターです。
[ReactiveX - Filter operator][filter]
この図だと渡されたObservarの中から〇だけをフィルタリングして、〇形状のものだけをonNextしています。
Observable.just(1,10,4,21,19,2,13,9)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer item) {
return (item < 15);
}
})
関数Func1の中で戻り値がtrueであればその値がonNextに到達するわけです。
take
個数を指定して取り出します。
[ReactiveX - Take operator][take]
図の通りです。
Observable.just(1,10,4,21,19,2,13,9)
.take(2)
先頭2つを取り出します。
takeLastを用いれば後ろからデータを取得することができます。
Observable.range(1,10)
.takeLast(3)
first/last/elementAt ..OrDefault
最初(最後、もしくは指定したところ)だけ取得してOnNextします。
要素一つのみをonNextするのが特徴です。
最初の赤いやつだけをObservableに流しています。
最後のピンクだけObservableに流しています。
指定された番号2だけをObservableに流しています。
Observable.just(1,2,3,4,5,6)
.first();
Observable.just(1,2,3,4,5,6)
.last();
Observable.just(1,2,3,4,5,6)
.firstOrDefault(1);
OrDefaultがあることで安全な設計ができますね。
Observable.just(1,2,3,4,5,6)
.elementAt(3);
Observable.just(1,2,3,4,5,6)
.elementAtOrDefault(2,11);
0からスタートする点に注意です。
こうすることで五番目が存在しなくても例外が発生せず、Observableに値が流れます。
sample
一定時間ごとに下のストリームに値を流します。
引数は sample(long,TimeUnit)
です。
この図だとわかりやすいですね。
一定時間立つと直前のストリームに流れていた値をsubscribeします。
Observable.interval(1,TimeUnit.DAYS)
.sample(2,TimeUnit.DAYS)
動作確認はしてませんがこのように使います。
ニュアンスだけ感じ取っていただければ。
サンプリング周期とか言いませんか?あれです。上記のコードでは1日毎にObservableにデータが流れてくるのを2日に一回onNextします。
ログインボーナス毎日もらえるけど2日に一回だけもらう、ってイメージでしょうか(?)
throttle First/Last
throttleFirst と throttleLast です。引数はsampleと同じく throttleFirst(long,TimeUnit)
です。
一定時間、onNextはしませんよという具合のものです。
黄色い〇から見て次に下のストリームに流れたのは緑を飛ばして水色の〇です。なぜかというとlongで指定した時間立っていないので緑が通りません。
Observable.interval(1,TimeUnit.DAYS)
.throttleFirst(3,TimeUnit.DAYS)
もう気が付いている方もいるかもしれませんが throttleLast
はsampleと同じ動きをします。
distinct
Observable.just(1,2,2,2,1,3,4,1)
.distinct()
.subscribe(
new Observer<Integer>() {
@Override
public void onCompleted() {
Log.d("onCompleted", "owari");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer integer) {
Log.d("onNext", String.valueOf(integer));
}
}
);
10-07 20:58:22.086 14941-14941/com.example.yukin.rxjava1st D/onNext: 1
10-07 20:58:22.086 14941-14941/com.example.yukin.rxjava1st D/onNext: 2
10-07 20:58:22.086 14941-14941/com.example.yukin.rxjava1st D/onNext: 3
10-07 20:58:22.086 14941-14941/com.example.yukin.rxjava1st D/onNext: 4
実行結果はこのような形です。
DistinctUntilChanged
重複があった場合一つにまとめる(ここまでDistinctと一緒)
しかし、他の値の入力があった場合まとめない。
赤 -> 黄 -> 黄 ときたので黄色は1つにまとめられるが再び赤が発火すると黄色はもう一度呼び出される。
Observable.just(1,2,2,2,1,2,2,3,3,1)
.distinctUntilChanged()
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 1
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 2
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 1
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 2
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 3
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 1
わりと使いそう。
Debounce
私の日本語力では説明できませんでしたので図を見てください。
指定した時間データが流れてこなければ直前のデータでイベント発火する、という具合です。
図を見てもわからないと思うのでandroidでどのような事例で使うかを紹介します。
RxView.clicks(button).debounce(2,TimeUnit.SECONDS)
.subscribe(new Observer<Void>() {
@Override
public void onCompleted() {
Log.d("owari","owari");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Void aVoid) {
textView.setText("owari");
}
});
RxBindingでButtonのクリックを検知します。
クリックするたびにObservableが流れます。
しかし、debounceで2秒指定しているので2秒以内にもう一度ボタンをタップした場合onNextしません。二秒タップしないでいるとイベント発火が発生します。
ignoreElements
どんなObservableであっても通しません。
onNextも呼び出されないのでonError,OnCompletedのみ呼び出されます。
skip
skipは二つ使い方が存在します。
まずはオーソドックスなほう
Observable.range(1,10)
.skip(3)
引数で指定した数だけonNextをしません。
また、skipLastというものも存在します。
Observable.range(1,10)
.skipLast(3)
後ろの三つを省きます。
もう一つは
時間で指定します。引数で渡された時間の間はonNextしません。
Observable.range(1,11)
.skip(10,TimeUnit.DAYS)
10日間onNextしません。
変換
Observableに流れているデータを変換してsubscribeしたりもするでしょう。
そんな時に役に立つOperatorを紹介しましょう。
map
流れてきたデータを整形します。
キャストっぽいことを行うイメージがいいでしょう。
Observable.range(1,10)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return integer.toString();
}
})
こんな形です。
こうするとonNextに流れてくるデータはStringのデータが帰ってくるわけです。
flatMap
一つのアイテムから複数のアイテムにしたりするのに使います。
図のパターンではflatMapの条件は〇を◇2つに分解しています。
そのためonNextされるデータは「赤い〇1つ」からは「赤い◇2つ」が生成されています。flatMapではObservableを生成している、というのが肝です。
では例でみてみましょう。
String[] place = {"静岡県磐田市","神奈川県川崎市","千葉県柏市","石川県金沢市","宮城県仙台市","茨城県鹿嶋市"};
Observable.from(place)
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.from(s.split("県"));
}})
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.contains("市");
}
})
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.d("city",s);
}
});
サッカーチームのある市を県から書いたStringの配列をObservableとします。
県の情報は不要なので市のみのデータを取ります。
まずは元のデータからflatMapを用いて「県」で区切った新たなObservableを作ります。要素数はこれで倍になります。
県の情報は不要なので「市」の含まれていないほうのデータはfilterで捨てます。
こういったことができるわけです。
参考までに実行結果は以下です。
10-18 12:44:27.492 2776-2776/com.example.yukin.rxjava1st D/city: 磐田市
10-18 12:44:27.493 2776-2776/com.example.yukin.rxjava1st D/city: 川崎市
10-18 12:44:27.493 2776-2776/com.example.yukin.rxjava1st D/city: 柏市
10-18 12:44:27.493 2776-2776/com.example.yukin.rxjava1st D/city: 金沢市
10-18 12:44:27.493 2776-2776/com.example.yukin.rxjava1st D/city: 仙台市
10-18 12:44:27.493 2776-2776/com.example.yukin.rxjava1st D/city: 鹿嶋市
concatMap
flatMapの親戚です。
動きとしてはflatMapと一緒です。何が違うかというとObservableに流れてきた順番を守ります。直列操作です。一方でflatMapは順番を守らない並列操作となります。
switchMap
こいつも親戚です。
並列操作です。しかし、割り込まれた場合そいつはもうObservableを生成しなくなります。
buffer
少し今までのものと色は違います。指定した数でObservableのデータをまとめてlistでonNextします。
下の信号機みたいなものがlist<〇>です。
ではコードを見てみましょう。
Observable.range(1,16)
.buffer(3)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(List<Integer> integers) {
Log.d("next",integers.get(integers.size()-1).toString());
}
});
上記の例では1~16のObservableを3つ区切りでlistします。
一つ余りますのでどのような処理になるかというと、
10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 3
10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 6
10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 9
10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 12
10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 15
10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 16
このような出力結果になります。残った数もしっかりlist化されているのがわかりますね。
またこのoperatorはskipもできます。
2つ区切り3つ目を飛ばす、といった具合です。
.buffer(2,3)
先ほどのrangeに対してこのように渡します。
わかりやすいようにonNextを変更してlistをすべて表示
出力結果は
10-18 13:38:09.829 6606-6606/com.example.yukin.rxjava1st D/next: 1
10-18 13:38:09.829 6606-6606/com.example.yukin.rxjava1st D/next: 2
10-18 13:38:09.829 6606-6606/com.example.yukin.rxjava1st D/next: 4
10-18 13:38:09.829 6606-6606/com.example.yukin.rxjava1st D/next: 5
10-18 13:38:09.829 6606-6606/com.example.yukin.rxjava1st D/next: 7
10-18 13:38:09.830 6606-6606/com.example.yukin.rxjava1st D/next: 8
10-18 13:38:09.830 6606-6606/com.example.yukin.rxjava1st D/next: 10
10-18 13:38:09.830 6606-6606/com.example.yukin.rxjava1st D/next: 11
10-18 13:38:09.831 6606-6606/com.example.yukin.rxjava1st D/next: 13
10-18 13:38:09.831 6606-6606/com.example.yukin.rxjava1st D/next: 14
10-18 13:38:09.831 6606-6606/com.example.yukin.rxjava1st D/next: 16
1 -> 2 -> 3飛ばして 4 -> 5 -> 6飛ばして..
といった処理をしているのがわかると思います。
groupBy
そのまんまです。
条件式を渡してそれに沿ってObservableを分けます。
〇は〇 ▽は▽ 腐るものは腐らせ、焼くものは焼いて、地球クリーンry
コードを張ります。
Observable.range(1, 16)
.groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer % 3;
}
})
.subscribe(new Subscriber<GroupedObservable<Integer, Integer>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {
integerIntegerGroupedObservable.toList().subscribe(new Subscriber<List<Integer>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(List<Integer> integers) {
for (int item : integers)
Log.d(String.valueOf(integerIntegerGroupedObservable.getKey()), String.valueOf(item));
}
});
}
});
ちょっとわかりにくいですが1~16のObservableをgroupByします。3で割った数の余りでグループ化します。
そしてもらうデータは「integerIntegerGroupedObservable」というものでまたObservableなのでそれをtoListしてSubscribeします。
getKeyをやるとなんのグループなのかがわかります。
では実行結果を見てみましょう。
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/0: 3
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/0: 6
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/0: 9
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/0: 12
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/0: 15
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 1
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 4
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 7
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 10
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 13
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 16
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/2: 2
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/2: 5
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/2: 8
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/2: 11
10-18 14:04:03.286 9241-9241/com.example.yukin.rxjava1st D/2: 14
余りが0の3の倍数はkeyが0です。
余りが1の1+3の倍数はkeyが1...
といった形になります。
少しコードは醜くなりますが扱いやすいOperatorなので。
scan
Observableから与えられたものを順番に足していきます。
赤から赤緑そして、赤緑青となります。黄色も足したいですね。
String[] completeness = {"ネオ","アームストロング","サイクロン","ジェット","アームストロング砲"};
Observable.from(completeness)
.scan(new Func2<String, String, String>() {
@Override
public String call(String s, String s2) {
return s+s2;
}})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d("owari","owari");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
Log.d("完成度",s);
}
});
コードはこのような感じです。Func2を使います。
ちなみにmapのような使いかたももちろんできます。
実行結果は
10-20 19:30:41.575 16269-16269/com.example.yukin.rxjava1st D/完成度: ネオ
10-20 19:30:41.575 16269-16269/com.example.yukin.rxjava1st D/完成度: ネオアームストロング
10-20 19:30:41.575 16269-16269/com.example.yukin.rxjava1st D/完成度: ネオアームストロングサイクロン
10-20 19:30:41.575 16269-16269/com.example.yukin.rxjava1st D/完成度: ネオアームストロングサイクロンジェット
10-20 19:30:41.575 16269-16269/com.example.yukin.rxjava1st D/完成度: ネオアームストロングサイクロンジェットアームストロング砲
完成度たけぇなおい。
時間系
タイムアウトさせたり遅延させたりさせるやつらです。
interval
指定した時間毎にonNextします。
無限にし続けるのでtakeで回数を指定したりして使います。
Observable.interval(5,TimeUnit.SECONDS)
.take(1)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Long aLong) {
}
});
五秒後に1回だけonNextそしてonCompletedが呼ばれます。
timeout
文字通りタイムアウトさせます。
これは最後の要素が流れてから次の要素が流れるまでに指定した時間だけ過ぎた場合そこでonCompletedに移動します。
timeout(long,TimeUnit)
またtimeOut後に用意しておいたObservableを流すこともできます。
引数は
timeout(long,TimeUnit,Observable)
となっています。
delay
.delay(long,TimeUnit)
doOnNext(completed,error)
onほにゃららのときにストリームには一切関与しませんが(したくないけど)値を取り出したいときに使います。
なんだかこの図すごくわかりにくいというか意味をなしていない気がします。
サンプルを見て確認してみましょう。
final ArrayList list = new ArrayList();
Observable.range(1,10)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
if(integer%2 == 0) list.add(integer);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d("owari","尾張の国");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer integer) {
Log.d("onNext",String.valueOf(integer));
}
});
next以外にもerroとcompletedで使えます。
組み合わせ
Observable同士を混ぜたり待ち合わせしたりします。
merge
どちらか片方のObservableのデータが終わるまでストリームに。順番は流れてきた順に流します。
mergeですよ。
上のストリームが終わったら終了しているのがわかると思います。
応用的に使うと、片方を時間で発火するObservableを作ってmergeすることでtimeoutさせることができるというわけです。まぁOperatorにtimeoutがあるんですがね。
Observable<Integer> observable1 = Observable.range(1,100)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
})
.subscribeOn(Schedulers.newThread());
Observable<Integer> observable2 = Observable.range(1,100)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 1;
}
})
.subscribeOn(AndroidSchedulers.mainThread());
Observable.concat(observable1,obs
ervable2)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.d("ok","owari");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.d("ame",String.valueOf(integer));
}
});
こんな感じのを用意してみました。
concat
mergeに似ていますが、違う点はObservableからstreamに流す順番が決まっていることです。引数で渡された順に終わるまで待ちます。
startWith
iteratorやObservableを渡すことができます。そいつを先頭においてスタートします。concatと似ていますし同じ使い方ができますね。
こちらは値を渡している図
こちらは横の棒が入ったObservableを渡している図です。
combineLatest
すごく雑に混ぜてくれるやつです。
二つのストリームを統合するものなのですがストリームを待ったりはしません。
色ストリームと図形ストリームがあります。
一回目は色ストリームと図形ストリームがそろった発火するのですが二回目以降は待ち合わせは発生しません。次に流れているオレンジのアイテムを前回発火した図形である◇と合わせて発火する、といったものになります。
コードはあんまりいい例ではないですが参考程度に。
Observable<TimeInterval<Long>> observable1 = Observable.interval(4,TimeUnit.SECONDS)
.timeInterval()
.take(10);
Observable<TimeInterval<Long>> observable2 = Observable.interval(1,TimeUnit.SECONDS)
.timeInterval()
.take(20);
Observable.combineLatest(observable1, observable2, new Func2<TimeInterval<Long>, TimeInterval<Long>, Long>() {
@Override
public Long call(TimeInterval<Long> longTimeInterval, TimeInterval<Long> longTimeInterval2) {
return longTimeInterval.getValue()+longTimeInterval2.getValue();
}
}).subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
Log.d("owari","owari");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
Log.d("aLong",String.valueOf(aLong));
}
});
雰囲気をつかんでもらえればいいなと思っていて、
Observable同士を合成します。
fun2で実行します。
zip
二つ以上のObservableから流れてきた値を合成して新しい一つの値を作ります。
こちらは一つずつしっかり値を待ちます。combineLatestよりしっかり仕事するやつ、みたいなイメージでしょうか。
図形Observableと大きさObservableと色Observableに分かれています。
手前2つのデータが出そろい、色Observableが流れてきたタイミングで発火しています。
以上
長すぎてkobitoが重いです。