ReactiveExtensions
RxJS
Rx
RxJSDay 10

RxJS の Operators (8) - Observable Utility Operators (2)

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 の 10 日目かつ RxJS Advent Calendar 2015 の 10 日目です。


はじめに

今日は昨日に続いて ReactiveX の Observable Utility Operators について RxJS の API ドキュメントやサンプルコードを書いていきます。

また RxJS 4.0.7 を対象にしています。


Observable.prototype.timeInterval

値 (value) とその前の値との間隔 (interval) とに変換します。後述の timestamp と似ています。

import { Observable } from 'rx';

Observable
.timer(0, 1000)
.timeInterval()
.map(({ value, interval }) => `${value}:${interval}`)
.take(4)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0:2
// onNext: 1:1002
// onNext: 2:999
// onNext: 3:997
// onCompleted


Observable.prototype.timestamp

値 (value) とそのタイムスタンプ (timestamp) とに変換します。前述の timestamp と似ています。

import { Observable } from 'rx';

Observable
.timer(0, 1000)
.timestamp()
.map(({ value, timestamp }) => `${value}:${timestamp}`)
.take(4)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0:1449755362020
// onNext: 1:1449755363025
// onNext: 2:1449755364023
// onNext: 3:1449755365022
// onCompleted


Observable.prototype.timeout

指定した時間よりも interval があいた場合にタイムアウトとしてエラーにします。

import { Observable } from 'rx';

Observable
.concat(
Observable.timer(100),
Observable.timer(200),
Observable.timer(300),
Observable.timer(400),
Observable.timer(500)
)
.timeInterval()
.map(({ value, interval }) => `${value}:${interval}`)
.timeout(350)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0:106
// onNext: 0:224
// onNext: 0:306
// onError: TimeoutError: Timeout has occurred

個人的には Rx のエラーハンドリングはそんなに便利だと思えないので、積極的に使いたいとは思えないのですが……。


Observable.using

まだ Disposable についてまったく触れていないので、あまり書きたくないのですけど……。

.NET ではおなじみの using 構文に近いもの。Java だと try (...) {} が近いです。

動作としては Disposable を必ず dispose してくれるはずです。 subscribe が返す Disposable (Rx 的には Subscription と呼ばれているはず……) を明示的に dispose すると良いはずです。

import { Observable } from 'rx';

class MyDisposable {
dispose() {
console.log('disposed');
}
}

Observable
.using(
() => new MyDisposable(),
(resource) => {
// ... use disposable resource ...
return Observable.empty();
}
)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onCompleted
// disposed

この例では明示的に dispose はしていませんが、dispose が呼ばれているので、おそらく onCompleteddispose されていると思います (未確認) 。

また Disposable 関連のコードを読むときに触れるつもりです。


Observable.prototype.materialize / Observable.prototype.dematerialize

onNextonCompletedonError 自体を操作できるようです。説明が難しいので例を挙げます。

import { Observable } from 'rx';

Observable
.from([1, 2, 3])
.materialize()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: OnNext(1)
// onNext: OnNext(2)
// onNext: OnNext(3)
// onNext: OnCompleted()
// onCompleted

ここで流れてきているのは Rx.Notification クラスのインスタンスです。OnNext / OnCompleted / OnError があります。OnNextvalueOnErrorerror を持ちます。

materializeObservalbe を流れるデータを Notification に変換します。 dematerialize はその逆を行います。

今度は、明示的に NotificationObservabledematerialize してみましょう。

import { Observable, Notification } from 'rx';

Observable
.from([
Notification.createOnNext(4),
Notification.createOnNext(5),
Notification.createOnCompleted(),
Notification.createOnNext(6)
])
.dematerialize()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 4
// onNext: 5
// onCompleted

onCompleted のあとにはデータは流れないようです。当然そうですよね。

メタな操作をしたくなったときに使えるのかもしれません。


おわりに

今日は昨日に続き Observable のユーティリティを見ました。知っておくと便利な場面があるかもしれませんね。