ReactiveExtensions
RxJS
Rx
RxJSDay 5

RxJS の Operators (3) - Observable のフィルター

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 の 5 日目かつ RxJS Advent Calendar 2015 の 5 日目です。


はじめに

今日は ReactiveX の Operators の Filtering について RxJS の API ドキュメントやソースコードを見ていきます。

また RxJS 4.0.7 を対象にしています。


Observable のフィルター


Observable.prototype.filter

Array.prototype.filter でもおなじみのものです。動きについては説明は不要でしょう。

import { Observable } from 'rx';

Observable
.from([1, 2, 3, 4, 5, 6])
.filter(value => value % 2 === 0)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 2
// onNext: 4
// onNext: 6
// onCompleted


Observable.prototype.first / Observable.prototype.last

最初の値や最後の値のみにフィルターします。Observable.prototype.filter のように predicate をとることもできます。

import { Observable } from 'rx';

Observable
.from([1, 2, 3, 4, 5, 6])
.first()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 1
// onCompleted

import { Observable } from 'rx';

Observable
.from([1, 2, 3, 4, 5, 6])
.last()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 6
// onCompleted

前回 Observable.prototype.scan の説明でも書いたとおり、onCompleted を待たないケースもままあるので、そういう場面では Observable.prototype.last は使いづらいかもしれません。

また後述の firsttake(1)elementAt(0) と、lasttakeLast(1) とはほとんど同じです (実装は別ものです) 。


Observable.prototype.elementAt

n 番目の要素だけにフィルターします。takeskip と違い、引数は count ではなく index です (これは間違えないと思いますが) 。


Observable.prototype.skip / Observable.prototype.take

既に前述の説明で紹介してしまいましたが、skip は n 個の要素をスキップし、take は n 個の要素をとる形でフィルターします。

import { Observable } from 'rx';

Observable
.from([1, 2, 3, 4, 5, 6])
.skip(2)
.take(2)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 3
// onNext: 4
// onCompleted

skip および take はその派生版も便利だと思います。

たとえば Observable.prototype.skipWhilepredicate を取り、条件が真の間は値をスキップします。

import { Observable } from 'rx';

Observable
.from([1, 2, 3, 4, 5, 6])
.skipWhile(value => value < 4)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 4
// onNext: 5
// onNext: 6
// onCompleted

個人的には firstpredicate を取るなら skippredicate を取ればいいのにと思うのですが (index も引数で取れますし……) 、どうなんでしょうね。


Observable.prototype.sample / Observable.prototype.debounce

一定間隔ごとに最後の値を取ります。

正直なところ、ぼくはよく分かっていません。おそらく debounce は次の値まで指定した間隔があいた場合だけ流す、sample は指定した間隔ごとに流す、のような気がします。なので debounce だと指定した間隔よりも短い間隔で値が流れ続けるるとフィルターされて流れてこないし、 sample は指定した間隔ごとに流してくれるっぽいです。


Observable.prototype.distinct

重複していないものだけを流します。

import { Observable } from 'rx';

Observable
.from([1, 2, 2, 3, 1, 4])
.distinct()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 1
// onNext: 2
// onNext: 3
// onNext: 4
// onCompleted

派生版の distinctUntilChanged は直前のものから変化すると流れてきます。

import { Observable } from 'rx';

Observable
.from([1, 2, 2, 3, 1, 4])
.distinctUntilChanged()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 1
// onNext: 2
// onNext: 3
// onNext: 1
// onNext: 4
// onCompleted

distinct の重複チェックにはすこし注意が必要で、同一 object の内容を変更した場合には同一のものと見なされます。内部で直近の値 (参照) を保持していて === で比較しているからです。別 object にする (イミュータブルな object として運用する) か、keySelectorobject の変更を検知するための hashCode などを指定すると良いと思います。


Observable.prototype.ignoreElements

onNext を流しません。すべての要素を無視します。 onErroronCompleted は流れます。

import { Observable } from 'rx';

Observable
.from([1, 2, 2, 3, 1, 4])
.ignoreElements()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onCompleted


おわりに

今日は Observable をフィルターする Operator を見てみました。