この記事は bouzuya's RxJS Advent Calendar 2015 の 10 日目かつ RxJS Advent Calendar 2015 の 10 日目です。
はじめに
今日は昨日に続いて ReactiveX の Observable Utility Operators について RxJS の API ドキュメントやサンプルコードを書いていきます。
また RxJS 4.0.7 を対象にしています。
Observable.prototype.timeInterval
- ReactiveX - TimeInterval operator
Observable.prototype.timeInterval
API DocumentObservable.prototype.timeInterval
Source Code
値 (value
) とその前の値との間隔 (interval
) とに変換します。後述の timestamp
と似ています。
import { Observable } from 'rx';
Observable
.timer(0, 1000)
.timeInterval()
.map(({ value, interval }) => `${value}:${interval}`)
.take(4)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0:2
// onNext: 1:1002
// onNext: 2:999
// onNext: 3:997
// onCompleted
Observable.prototype.timestamp
- ReactiveX - Timestamp operator
Observable.prototype.timestamp
API DocumentObservable.prototype.timestamp
Source Code
値 (value
) とそのタイムスタンプ (timestamp
) とに変換します。前述の timestamp
と似ています。
import { Observable } from 'rx';
Observable
.timer(0, 1000)
.timestamp()
.map(({ value, timestamp }) => `${value}:${timestamp}`)
.take(4)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0:1449755362020
// onNext: 1:1449755363025
// onNext: 2:1449755364023
// onNext: 3:1449755365022
// onCompleted
Observable.prototype.timeout
- ReactiveX - Timeout operator
Observable.prototype.timeout
API DocumentObservable.prototype.timeout
Source Code
指定した時間よりも interval
があいた場合にタイムアウトとしてエラーにします。
import { Observable } from 'rx';
Observable
.concat(
Observable.timer(100),
Observable.timer(200),
Observable.timer(300),
Observable.timer(400),
Observable.timer(500)
)
.timeInterval()
.map(({ value, interval }) => `${value}:${interval}`)
.timeout(350)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0:106
// onNext: 0:224
// onNext: 0:306
// onError: TimeoutError: Timeout has occurred
個人的には Rx のエラーハンドリングはそんなに便利だと思えないので、積極的に使いたいとは思えないのですが……。
Observable.using
- ReactiveX - Using operator
Observable.prototype.using
API DocumentObservable.prototype.using
Source Code
まだ Disposable
についてまったく触れていないので、あまり書きたくないのですけど……。
.NET ではおなじみの using
構文に近いもの。Java だと try (...) {}
が近いです。
動作としては Disposable
を必ず dispose
してくれるはずです。 subscribe
が返す Disposable
(Rx 的には Subscription
と呼ばれているはず……) を明示的に dispose
すると良いはずです。
import { Observable } from 'rx';
class MyDisposable {
dispose() {
console.log('disposed');
}
}
Observable
.using(
() => new MyDisposable(),
(resource) => {
// ... use disposable resource ...
return Observable.empty();
}
)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onCompleted
// disposed
この例では明示的に dispose
はしていませんが、dispose
が呼ばれているので、おそらく onCompleted
で dispose
されていると思います (未確認) 。
また Disposable
関連のコードを読むときに触れるつもりです。
Observable.prototype.materialize
/ Observable.prototype.dematerialize
- ReactiveX - Materialize/Dematerialize operator
Observable.prototype.materialize
API DocumentObservable.prototype.materialize
Source CodeObservable.prototype.dematerialize
API DocumentObservable.prototype.dematerialize
Source Code
onNext
や onCompleted
や onError
自体を操作できるようです。説明が難しいので例を挙げます。
import { Observable } from 'rx';
Observable
.from([1, 2, 3])
.materialize()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: OnNext(1)
// onNext: OnNext(2)
// onNext: OnNext(3)
// onNext: OnCompleted()
// onCompleted
ここで流れてきているのは Rx.Notification
クラスのインスタンスです。OnNext
/ OnCompleted
/ OnError
があります。OnNext
は value
を OnError
は error
を持ちます。
materialize
は Observalbe
を流れるデータを Notification
に変換します。 dematerialize
はその逆を行います。
今度は、明示的に Notification
の Observable
を dematerialize
してみましょう。
import { Observable, Notification } from 'rx';
Observable
.from([
Notification.createOnNext(4),
Notification.createOnNext(5),
Notification.createOnCompleted(),
Notification.createOnNext(6)
])
.dematerialize()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 4
// onNext: 5
// onCompleted
onCompleted
のあとにはデータは流れないようです。当然そうですよね。
メタな操作をしたくなったときに使えるのかもしれません。
おわりに
今日は昨日に続き Observable
のユーティリティを見ました。知っておくと便利な場面があるかもしれませんね。