この記事は bouzuya's RxJS Advent Calendar 2015 の 10 日目かつ RxJS Advent Calendar 2015 の 10 日目です。
はじめに
今日は昨日に続いて ReactiveX の Observable Utility Operators について RxJS の API ドキュメントやサンプルコードを書いていきます。
また RxJS 4.0.7 を対象にしています。
Observable.prototype.timeInterval
- ReactiveX - TimeInterval operator
Observable.prototype.timeIntervalAPI DocumentObservable.prototype.timeIntervalSource Code
値 (value) とその前の値との間隔 (interval) とに変換します。後述の timestamp と似ています。
import { Observable } from 'rx';
Observable
.timer(0, 1000)
.timeInterval()
.map(({ value, interval }) => `${value}:${interval}`)
.take(4)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0:2
// onNext: 1:1002
// onNext: 2:999
// onNext: 3:997
// onCompleted
Observable.prototype.timestamp
- ReactiveX - Timestamp operator
Observable.prototype.timestampAPI DocumentObservable.prototype.timestampSource Code
値 (value) とそのタイムスタンプ (timestamp) とに変換します。前述の timestamp と似ています。
import { Observable } from 'rx';
Observable
.timer(0, 1000)
.timestamp()
.map(({ value, timestamp }) => `${value}:${timestamp}`)
.take(4)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0:1449755362020
// onNext: 1:1449755363025
// onNext: 2:1449755364023
// onNext: 3:1449755365022
// onCompleted
Observable.prototype.timeout
- ReactiveX - Timeout operator
Observable.prototype.timeoutAPI DocumentObservable.prototype.timeoutSource Code
指定した時間よりも interval があいた場合にタイムアウトとしてエラーにします。
import { Observable } from 'rx';
Observable
.concat(
Observable.timer(100),
Observable.timer(200),
Observable.timer(300),
Observable.timer(400),
Observable.timer(500)
)
.timeInterval()
.map(({ value, interval }) => `${value}:${interval}`)
.timeout(350)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0:106
// onNext: 0:224
// onNext: 0:306
// onError: TimeoutError: Timeout has occurred
個人的には Rx のエラーハンドリングはそんなに便利だと思えないので、積極的に使いたいとは思えないのですが……。
Observable.using
- ReactiveX - Using operator
Observable.prototype.usingAPI DocumentObservable.prototype.usingSource Code
まだ Disposable についてまったく触れていないので、あまり書きたくないのですけど……。
.NET ではおなじみの using 構文に近いもの。Java だと try (...) {} が近いです。
動作としては Disposable を必ず dispose してくれるはずです。 subscribe が返す Disposable (Rx 的には Subscription と呼ばれているはず……) を明示的に dispose すると良いはずです。
import { Observable } from 'rx';
class MyDisposable {
dispose() {
console.log('disposed');
}
}
Observable
.using(
() => new MyDisposable(),
(resource) => {
// ... use disposable resource ...
return Observable.empty();
}
)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onCompleted
// disposed
この例では明示的に dispose はしていませんが、dispose が呼ばれているので、おそらく onCompleted で dispose されていると思います (未確認) 。
また Disposable 関連のコードを読むときに触れるつもりです。
Observable.prototype.materialize / Observable.prototype.dematerialize
- ReactiveX - Materialize/Dematerialize operator
Observable.prototype.materializeAPI DocumentObservable.prototype.materializeSource CodeObservable.prototype.dematerializeAPI DocumentObservable.prototype.dematerializeSource Code
onNext や onCompleted や onError 自体を操作できるようです。説明が難しいので例を挙げます。
import { Observable } from 'rx';
Observable
.from([1, 2, 3])
.materialize()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: OnNext(1)
// onNext: OnNext(2)
// onNext: OnNext(3)
// onNext: OnCompleted()
// onCompleted
ここで流れてきているのは Rx.Notification クラスのインスタンスです。OnNext / OnCompleted / OnError があります。OnNext は value を OnError は error を持ちます。
materialize は Observalbe を流れるデータを Notification に変換します。 dematerialize はその逆を行います。
今度は、明示的に Notification の Observable を dematerialize してみましょう。
import { Observable, Notification } from 'rx';
Observable
.from([
Notification.createOnNext(4),
Notification.createOnNext(5),
Notification.createOnCompleted(),
Notification.createOnNext(6)
])
.dematerialize()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 4
// onNext: 5
// onCompleted
onCompleted のあとにはデータは流れないようです。当然そうですよね。
メタな操作をしたくなったときに使えるのかもしれません。
おわりに
今日は昨日に続き Observable のユーティリティを見ました。知っておくと便利な場面があるかもしれませんね。