LoginSignup
61

More than 5 years have passed since last update.

RxJavaOperator解説と使い方

Last updated at Posted at 2016-10-25

よく使うやつから絶対つかわねーだろみたいなやつ書きました。
たまーに見返して毎日をゆたかにしましょう。
もしよければこっちに基本的な使い方が書いてあるのでどうぞ

生成

create

create.c.png
[ReactiveX - Create operator][create]
別ObservableのsubscribeによってObservalbeを生成するオペレーター。
意味が分かれば図の通りなんだけど意味わかんないと思うのでサンプルコードを。
ではどういったときに使うかというと非同期処理等でエラーハンドリングが必要な際に使います。
では具体例を。

Observable<String> observable = Observable.create(
        new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    FileInputStream fileInputStream;
                    fileInputStream = openFileInput("MyFile.txt");
                    byte[] readBytes = new byte[fileInputStream.available()];
                    fileInputStream.read(readBytes);
                    subscriber.onNext(new String(readBytes));

                    fileInputStream = openFileInput("2ndMyFile.txt");
                    byte[] readBytes2nd = new byte[fileInputStream.available()];
                    fileInputStream.read(readBytes2nd);
                    subscriber.onNext(new String(readBytes2nd));

                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }

こんな使い方するかどうかは別としてcreateを使うときは例外処理が発生するときだと思っています。
自分でsubscribe発火を制御する感じです。
公式サンプルではfor文でObservableを作っていますね。
Iteraorを持っていない連続したデータをObservableにしたいときにも使うでしょう。

just

Observableにする値を直接指定します。
just.png
[ReactiveX - Just operator][just]
画像では一つしかObservableに流れていませんが10個まで指定できます。

Observable.just(1,2,3,4,5,6,7,8,9)

テストとかに使うんですかね。

repaet

onCompletedを通った際もう一度Suscribeをしてくれます。文面じゃ理解しにくいので図とコードをさっそく見てください。
repeat.o.png
赤緑青をObservableに流します。データとしては横の棒で終了しますが、repeatOperatorを通ったあとは同じデータがsubscribeされ続けます。
まずは

Observable.just(1,2,3,4,5,6,7,8,9)
        .repeat(5)

回数の指定。そりゃできます。

repeatWhen

repeatのお友達でリピートする間隔を指定します。
repeatWhen.f.png
データは黄色、赤の〇。それが棒で終了したら青いものが発行されてdelay時間を挟んでからもう一度subscribeします。

Observable.just(1,2,3,4,5,6,7,8,9)
        .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Void> observable) {
                return observable.delay(5, TimeUnit.DAYS);
            }
        });
    }

詳しくはこの記事を参照してみてください。私も非常に勉強になりました。

Empty/Never/Throw

特殊な三人組です。
EmptyはOnCompleteだけ飛んできます。
Neverはそれすらも呼ばれない空のObservableです。
ThrowはonErrorを飛ばすObservableを流します。
一応発火のイメージを把握するために画像も見てましょう。
empty.c.png
最後に発火しています。こちらがOnCompletedです。
never.c.png
ごらんのとおり発火すらしません。
throw.c.png
Errorの発火をしています。

from

IteraotrをもつオブジェクトからObservableを生成します。
あり得ないほどわかりやすいと思います。
Arrayだったら大体そうでしょう。
from.c.png

String[] array = new String[]{"sasaki","ササキ","ささき","佐々木"};
Observable.from(array)

ちなみにHashMapからデータを取る際は、

Observable.from(map.entrySet())

といった具合です。

range

intで始まりと終わりを指定してその間の数字のObservableを生成します。range.png
めちょ簡単です。

 Observable.range(1,10);

フィルタリング

作ったでーたをフィルタリングするものです。
流れてきたデータに対してonNextを行わせるかどうか、という処理を主にするオペレーターです。

filter

文字通りフィルタリングをしてくれます。
コレクション操作でもかなり使うポピュラーなオペレーターです。
filter.png
[ReactiveX - Filter operator][filter]
この図だと渡されたObservarの中から〇だけをフィルタリングして、〇形状のものだけをonNextしています。

Observable.just(1,10,4,21,19,2,13,9)
        .filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer item) {
                return (item < 15);
            }
        })

関数Func1の中で戻り値がtrueであればその値がonNextに到達するわけです。

take

個数を指定して取り出します。
take.png
[ReactiveX - Take operator][take]
図の通りです。

Observable.just(1,10,4,21,19,2,13,9)
    .take(2)

先頭2つを取り出します。

takeLastを用いれば後ろからデータを取得することができます。

Observable.range(1,10)
        .takeLast(3)

first/last/elementAt ..OrDefault

最初(最後、もしくは指定したところ)だけ取得してOnNextします。
要素一つのみをonNextするのが特徴です。
first.png
最初の赤いやつだけをObservableに流しています。
last.png
最後のピンクだけObservableに流しています。
elementAt.png
指定された番号2だけをObservableに流しています。

Observable.just(1,2,3,4,5,6)
        .first();
Observable.just(1,2,3,4,5,6)
        .last();
Observable.just(1,2,3,4,5,6)
        .firstOrDefault(1);

OrDefaultがあることで安全な設計ができますね。

Observable.just(1,2,3,4,5,6)
        .elementAt(3);
Observable.just(1,2,3,4,5,6)
        .elementAtOrDefault(2,11);

elementAt.png
0からスタートする点に注意です。
elementAtOrDefault.png
こうすることで五番目が存在しなくても例外が発生せず、Observableに値が流れます。

sample

一定時間ごとに下のストリームに値を流します。
引数は sample(long,TimeUnit) です。
sample.png
この図だとわかりやすいですね。
一定時間立つと直前のストリームに流れていた値をsubscribeします。

Observable.interval(1,TimeUnit.DAYS)
        .sample(2,TimeUnit.DAYS)

動作確認はしてませんがこのように使います。
ニュアンスだけ感じ取っていただければ。
サンプリング周期とか言いませんか?あれです。上記のコードでは1日毎にObservableにデータが流れてくるのを2日に一回onNextします。
ログインボーナス毎日もらえるけど2日に一回だけもらう、ってイメージでしょうか(?)

throttle First/Last

throttleFirst と throttleLast です。引数はsampleと同じく throttleFirst(long,TimeUnit) です。
一定時間、onNextはしませんよという具合のものです。
throttleFirst.png
黄色い〇から見て次に下のストリームに流れたのは緑を飛ばして水色の〇です。なぜかというとlongで指定した時間立っていないので緑が通りません。

Observable.interval(1,TimeUnit.DAYS)
        .throttleFirst(3,TimeUnit.DAYS)

もう気が付いている方もいるかもしれませんが throttleLast はsampleと同じ動きをします。

distinct

重複なくonNextに流します。
distinct.png

Observable.just(1,2,2,2,1,3,4,1)
        .distinct()
        .subscribe(
                new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        Log.d("onCompleted", "owari");
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d("onNext", String.valueOf(integer));
                    }
                }
        );
10-07 20:58:22.086 14941-14941/com.example.yukin.rxjava1st D/onNext: 1
10-07 20:58:22.086 14941-14941/com.example.yukin.rxjava1st D/onNext: 2
10-07 20:58:22.086 14941-14941/com.example.yukin.rxjava1st D/onNext: 3
10-07 20:58:22.086 14941-14941/com.example.yukin.rxjava1st D/onNext: 4

実行結果はこのような形です。

DistinctUntilChanged

重複があった場合一つにまとめる(ここまでDistinctと一緒)
しかし、他の値の入力があった場合まとめない。
distinctUntilChanged.png
赤 -> 黄 -> 黄 ときたので黄色は1つにまとめられるが再び赤が発火すると黄色はもう一度呼び出される。

Observable.just(1,2,2,2,1,2,2,3,3,1)
        .distinctUntilChanged()
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 1
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 2
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 1
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 2
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 3
10-07 21:10:14.459 23959-23959/com.example.yukin.rxjava1st D/onNext: 1

わりと使いそう。

Debounce

私の日本語力では説明できませんでしたので図を見てください。
debounce.png
指定した時間データが流れてこなければ直前のデータでイベント発火する、という具合です。
図を見てもわからないと思うのでandroidでどのような事例で使うかを紹介します。

RxView.clicks(button).debounce(2,TimeUnit.SECONDS)
        .subscribe(new Observer<Void>() {
            @Override
            public void onCompleted() {
                Log.d("owari","owari");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Void aVoid) {
                textView.setText("owari");
            }
        });

RxBindingでButtonのクリックを検知します。
クリックするたびにObservableが流れます。
しかし、debounceで2秒指定しているので2秒以内にもう一度ボタンをタップした場合onNextしません。二秒タップしないでいるとイベント発火が発生します。

ignoreElements

どんなObservableであっても通しません。
ignoreElements.c.png
onNextも呼び出されないのでonError,OnCompletedのみ呼び出されます。

skip

skipは二つ使い方が存在します。
まずはオーソドックスなほう
skip.png

Observable.range(1,10)
        .skip(3)

引数で指定した数だけonNextをしません。
また、skipLastというものも存在します。

Observable.range(1,10)
        .skipLast(3)

後ろの三つを省きます。

もう一つはskip2.png
時間で指定します。引数で渡された時間の間はonNextしません。

Observable.range(1,11)
        .skip(10,TimeUnit.DAYS)

10日間onNextしません。

変換

Observableに流れているデータを変換してsubscribeしたりもするでしょう。
そんな時に役に立つOperatorを紹介しましょう。

map

流れてきたデータを整形します。
map.png
キャストっぽいことを行うイメージがいいでしょう。

Observable.range(1,10)
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return integer.toString();
            }
        })

こんな形です。
こうするとonNextに流れてくるデータはStringのデータが帰ってくるわけです。

flatMap

一つのアイテムから複数のアイテムにしたりするのに使います。
mergeMap.png
図のパターンではflatMapの条件は〇を◇2つに分解しています。
そのためonNextされるデータは「赤い〇1つ」からは「赤い◇2つ」が生成されています。flatMapではObservableを生成している、というのが肝です。
では例でみてみましょう。

String[] place = {"静岡県磐田市","神奈川県川崎市","千葉県柏市","石川県金沢市","宮城県仙台市","茨城県鹿嶋市"};
Observable.from(place)
        .flatMap(new Func1<String, Observable<String>>() {
            @Override
            public Observable<String> call(String s) {
                return Observable.from(s.split("県"));
            }})
        .filter(new Func1<String, Boolean>() {
            @Override
            public Boolean call(String s) {
                return s.contains("市");
            }
        })
        .subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.d("city",s);
            }
        });

サッカーチームのある市を県から書いたStringの配列をObservableとします。
県の情報は不要なので市のみのデータを取ります。
まずは元のデータからflatMapを用いて「県」で区切った新たなObservableを作ります。要素数はこれで倍になります。
県の情報は不要なので「市」の含まれていないほうのデータはfilterで捨てます。
こういったことができるわけです。

参考までに実行結果は以下です。

10-18 12:44:27.492 2776-2776/com.example.yukin.rxjava1st D/city: 磐田市
10-18 12:44:27.493 2776-2776/com.example.yukin.rxjava1st D/city: 川崎市
10-18 12:44:27.493 2776-2776/com.example.yukin.rxjava1st D/city: 柏市
10-18 12:44:27.493 2776-2776/com.example.yukin.rxjava1st D/city: 金沢市
10-18 12:44:27.493 2776-2776/com.example.yukin.rxjava1st D/city: 仙台市
10-18 12:44:27.493 2776-2776/com.example.yukin.rxjava1st D/city: 鹿嶋市

concatMap

flatMapの親戚です。
concatMap.png
動きとしてはflatMapと一緒です。何が違うかというとObservableに流れてきた順番を守ります。直列操作です。一方でflatMapは順番を守らない並列操作となります。

switchMap

こいつも親戚です。
switchMap.png
並列操作です。しかし、割り込まれた場合そいつはもうObservableを生成しなくなります。

buffer

少し今までのものと色は違います。指定した数でObservableのデータをまとめてlistでonNextします。
buffer3.png
下の信号機みたいなものがlist<〇>です。
ではコードを見てみましょう。

Observable.range(1,16)
        .buffer(3)
        .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(List<Integer> integers) {
                    Log.d("next",integers.get(integers.size()-1).toString());
            }
        });

上記の例では1~16のObservableを3つ区切りでlistします。
一つ余りますのでどのような処理になるかというと、

10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 3
10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 6
10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 9
10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 12
10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 15
10-18 13:24:04.115 6606-6606/com.example.yukin.rxjava1st D/next: 16

このような出力結果になります。残った数もしっかりlist化されているのがわかりますね。

またこのoperatorはskipもできます。
buffer4.png
2つ区切り3つ目を飛ばす、といった具合です。

.buffer(2,3)

先ほどのrangeに対してこのように渡します。
わかりやすいようにonNextを変更してlistをすべて表示
出力結果は

10-18 13:38:09.829 6606-6606/com.example.yukin.rxjava1st D/next: 1
10-18 13:38:09.829 6606-6606/com.example.yukin.rxjava1st D/next: 2
10-18 13:38:09.829 6606-6606/com.example.yukin.rxjava1st D/next: 4
10-18 13:38:09.829 6606-6606/com.example.yukin.rxjava1st D/next: 5
10-18 13:38:09.829 6606-6606/com.example.yukin.rxjava1st D/next: 7
10-18 13:38:09.830 6606-6606/com.example.yukin.rxjava1st D/next: 8
10-18 13:38:09.830 6606-6606/com.example.yukin.rxjava1st D/next: 10
10-18 13:38:09.830 6606-6606/com.example.yukin.rxjava1st D/next: 11
10-18 13:38:09.831 6606-6606/com.example.yukin.rxjava1st D/next: 13
10-18 13:38:09.831 6606-6606/com.example.yukin.rxjava1st D/next: 14
10-18 13:38:09.831 6606-6606/com.example.yukin.rxjava1st D/next: 16

1 -> 2 -> 3飛ばして 4 -> 5 -> 6飛ばして..

といった処理をしているのがわかると思います。

groupBy

そのまんまです。
条件式を渡してそれに沿ってObservableを分けます。
groupBy.png
〇は〇 ▽は▽ 腐るものは腐らせ、焼くものは焼いて、地球クリーンry

コードを張ります。


Observable.range(1, 16)
        .groupBy(new Func1<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) {
                return integer % 3;
            }
        })
        .subscribe(new Subscriber<GroupedObservable<Integer, Integer>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {
                integerIntegerGroupedObservable.toList().subscribe(new Subscriber<List<Integer>>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }

                    @Override
                    public void onNext(List<Integer> integers) {
                        for (int item : integers)
                            Log.d(String.valueOf(integerIntegerGroupedObservable.getKey()), String.valueOf(item));
                    }
                });
            }
        });

ちょっとわかりにくいですが1~16のObservableをgroupByします。3で割った数の余りでグループ化します。
そしてもらうデータは「integerIntegerGroupedObservable」というものでまたObservableなのでそれをtoListしてSubscribeします。
getKeyをやるとなんのグループなのかがわかります。

では実行結果を見てみましょう。

10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/0: 3
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/0: 6
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/0: 9
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/0: 12
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/0: 15
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 1
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 4
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 7
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 10
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 13
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/1: 16
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/2: 2
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/2: 5
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/2: 8
10-18 14:04:03.285 9241-9241/com.example.yukin.rxjava1st D/2: 11
10-18 14:04:03.286 9241-9241/com.example.yukin.rxjava1st D/2: 14

余りが0の3の倍数はkeyが0です。
余りが1の1+3の倍数はkeyが1...
といった形になります。
少しコードは醜くなりますが扱いやすいOperatorなので。

scan

Observableから与えられたものを順番に足していきます。
scan.png
赤から赤緑そして、赤緑青となります。黄色も足したいですね。

String[] completeness = {"ネオ","アームストロング","サイクロン","ジェット","アームストロング砲"};

Observable.from(completeness)
        .scan(new Func2<String, String, String>() {
            @Override
            public String call(String s, String s2) {
                return s+s2;
            }})
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.d("owari","owari");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(String s) {
                Log.d("完成度",s);
            }
        });

コードはこのような感じです。Func2を使います。
ちなみにmapのような使いかたももちろんできます。

実行結果は

10-20 19:30:41.575 16269-16269/com.example.yukin.rxjava1st D/完成度: ネオ
10-20 19:30:41.575 16269-16269/com.example.yukin.rxjava1st D/完成度: ネオアームストロング
10-20 19:30:41.575 16269-16269/com.example.yukin.rxjava1st D/完成度: ネオアームストロングサイクロン
10-20 19:30:41.575 16269-16269/com.example.yukin.rxjava1st D/完成度: ネオアームストロングサイクロンジェット
10-20 19:30:41.575 16269-16269/com.example.yukin.rxjava1st D/完成度: ネオアームストロングサイクロンジェットアームストロング砲

完成度たけぇなおい。

時間系

タイムアウトさせたり遅延させたりさせるやつらです。

interval

指定した時間毎にonNextします。
無限にし続けるのでtakeで回数を指定したりして使います。

interval.png

                    Observable.interval(5,TimeUnit.SECONDS)
                            .take(1)
                            .subscribe(new Subscriber<Long>() {
                                @Override
                                public void onCompleted() {
                                }

                                @Override
                                public void onError(Throwable e) {
                                    e.printStackTrace();
                                }

                                @Override
                                public void onNext(Long aLong) {
                                }
                            });

五秒後に1回だけonNextそしてonCompletedが呼ばれます。

timeout

文字通りタイムアウトさせます。
これは最後の要素が流れてから次の要素が流れるまでに指定した時間だけ過ぎた場合そこでonCompletedに移動します。

timeout.1.png

timeout(long,TimeUnit)

またtimeOut後に用意しておいたObservableを流すこともできます。
timeout.2.png

引数は

timeout(long,TimeUnit,Observable)

となっています。

delay

発火のタイミングを任意時間遅らせます。
delay.png

.delay(long,TimeUnit)

doOnNext(completed,error)

onほにゃららのときにストリームには一切関与しませんが(したくないけど)値を取り出したいときに使います。
doOnNext.png
なんだかこの図すごくわかりにくいというか意味をなしていない気がします。
サンプルを見て確認してみましょう。

        final ArrayList list = new ArrayList();
        Observable.range(1,10)
                .doOnNext(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        if(integer%2 == 0) list.add(integer);
                        }
                    }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d("owari","尾張の国");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(Integer integer) {
                Log.d("onNext",String.valueOf(integer));
            }
        });

next以外にもerroとcompletedで使えます。

組み合わせ

Observable同士を混ぜたり待ち合わせしたりします。

merge

どちらか片方のObservableのデータが終わるまでストリームに。順番は流れてきた順に流します。
merge.png
mergeですよ。

上のストリームが終わったら終了しているのがわかると思います。
応用的に使うと、片方を時間で発火するObservableを作ってmergeすることでtimeoutさせることができるというわけです。まぁOperatorにtimeoutがあるんですがね。

Observable<Integer> observable1 = Observable.range(1,100)
        .filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer % 2 == 0;
            }
        })
        .subscribeOn(Schedulers.newThread());
Observable<Integer> observable2 = Observable.range(1,100)
        .filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer % 2 == 1;
            }
        })
        .subscribeOn(AndroidSchedulers.mainThread());

Observable.concat(observable1,obs
ervable2)
        .subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d("ok","owari");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.d("ame",String.valueOf(integer));
            }
        });

こんな感じのを用意してみました。

concat

mergeに似ていますが、違う点はObservableからstreamに流す順番が決まっていることです。引数で渡された順に終わるまで待ちます。
concat.png

startWith

iteratorやObservableを渡すことができます。そいつを先頭においてスタートします。concatと似ていますし同じ使い方ができますね。

startWith.png
こちらは値を渡している図
startWith.o.png
こちらは横の棒が入ったObservableを渡している図です。

combineLatest

すごく雑に混ぜてくれるやつです。
二つのストリームを統合するものなのですがストリームを待ったりはしません。
combineLatest.png

色ストリームと図形ストリームがあります。
一回目は色ストリームと図形ストリームがそろった発火するのですが二回目以降は待ち合わせは発生しません。次に流れているオレンジのアイテムを前回発火した図形である◇と合わせて発火する、といったものになります。

コードはあんまりいい例ではないですが参考程度に。

Observable<TimeInterval<Long>> observable1 = Observable.interval(4,TimeUnit.SECONDS)
        .timeInterval()
        .take(10);

Observable<TimeInterval<Long>> observable2 = Observable.interval(1,TimeUnit.SECONDS)
        .timeInterval()
        .take(20);

Observable.combineLatest(observable1, observable2, new Func2<TimeInterval<Long>, TimeInterval<Long>, Long>() {
    @Override
    public Long call(TimeInterval<Long> longTimeInterval, TimeInterval<Long> longTimeInterval2) {
        return longTimeInterval.getValue()+longTimeInterval2.getValue();
    }
}).subscribe(new Observer<Long>() {
    @Override
    public void onCompleted() {
        Log.d("owari","owari");
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.d("aLong",String.valueOf(aLong));
    }
});

雰囲気をつかんでもらえればいいなと思っていて、
Observable同士を合成します。
fun2で実行します。

zip

二つ以上のObservableから流れてきた値を合成して新しい一つの値を作ります。
こちらは一つずつしっかり値を待ちます。combineLatestよりしっかり仕事するやつ、みたいなイメージでしょうか。

zip.o.png

図形Observableと大きさObservableと色Observableに分かれています。
手前2つのデータが出そろい、色Observableが流れてきたタイミングで発火しています。

以上

長すぎてkobitoが重いです。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
61