4
Help us understand the problem. What are the problem?

More than 5 years have passed since last update.

posted at

updated at

RxJS の Operators (8) - Observable Utility Operators (2)

この記事は bouzuya's RxJS Advent Calendar 2015 の 10 日目かつ RxJS Advent Calendar 2015 の 10 日目です。

はじめに

今日は昨日に続いて ReactiveX の Observable Utility Operators について RxJS の API ドキュメントやサンプルコードを書いていきます。

また RxJS 4.0.7 を対象にしています。

Observable.prototype.timeInterval

値 (value) とその前の値との間隔 (interval) とに変換します。後述の timestamp と似ています。

import { Observable } from 'rx';

Observable
  .timer(0, 1000)
  .timeInterval()
  .map(({ value, interval }) => `${value}:${interval}`)
  .take(4)
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );
// onNext: 0:2
// onNext: 1:1002
// onNext: 2:999
// onNext: 3:997
// onCompleted

Observable.prototype.timestamp

値 (value) とそのタイムスタンプ (timestamp) とに変換します。前述の timestamp と似ています。

import { Observable } from 'rx';

Observable
  .timer(0, 1000)
  .timestamp()
  .map(({ value, timestamp }) => `${value}:${timestamp}`)
  .take(4)
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );
// onNext: 0:1449755362020
// onNext: 1:1449755363025
// onNext: 2:1449755364023
// onNext: 3:1449755365022
// onCompleted

Observable.prototype.timeout

指定した時間よりも interval があいた場合にタイムアウトとしてエラーにします。

import { Observable } from 'rx';

Observable
  .concat(
    Observable.timer(100),
    Observable.timer(200),
    Observable.timer(300),
    Observable.timer(400),
    Observable.timer(500)
  )
  .timeInterval()
  .map(({ value, interval }) => `${value}:${interval}`)
  .timeout(350)
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );
// onNext: 0:106
// onNext: 0:224
// onNext: 0:306
// onError: TimeoutError: Timeout has occurred

個人的には Rx のエラーハンドリングはそんなに便利だと思えないので、積極的に使いたいとは思えないのですが……。

Observable.using

まだ Disposable についてまったく触れていないので、あまり書きたくないのですけど……。

.NET ではおなじみの using 構文に近いもの。Java だと try (...) {} が近いです。

動作としては Disposable を必ず dispose してくれるはずです。 subscribe が返す Disposable (Rx 的には Subscription と呼ばれているはず……) を明示的に dispose すると良いはずです。

import { Observable } from 'rx';

class MyDisposable {
  dispose() {
    console.log('disposed');
  }
}

Observable
  .using(
    () => new MyDisposable(),
    (resource) => {
      // ... use disposable resource ...
      return Observable.empty();
    }
  )
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );
// onCompleted
// disposed

この例では明示的に dispose はしていませんが、dispose が呼ばれているので、おそらく onCompleteddispose されていると思います (未確認) 。

また Disposable 関連のコードを読むときに触れるつもりです。

Observable.prototype.materialize / Observable.prototype.dematerialize

onNextonCompletedonError 自体を操作できるようです。説明が難しいので例を挙げます。

import { Observable } from 'rx';

Observable
  .from([1, 2, 3])
  .materialize()
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );
// onNext: OnNext(1)
// onNext: OnNext(2)
// onNext: OnNext(3)
// onNext: OnCompleted()
// onCompleted

ここで流れてきているのは Rx.Notification クラスのインスタンスです。OnNext / OnCompleted / OnError があります。OnNextvalueOnErrorerror を持ちます。

materializeObservalbe を流れるデータを Notification に変換します。 dematerialize はその逆を行います。

今度は、明示的に NotificationObservabledematerialize してみましょう。

import { Observable, Notification } from 'rx';

Observable
  .from([
    Notification.createOnNext(4),
    Notification.createOnNext(5),
    Notification.createOnCompleted(),
    Notification.createOnNext(6)
  ])
  .dematerialize()
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log('onCompleted')
  );
// onNext: 4
// onNext: 5
// onCompleted

onCompleted のあとにはデータは流れないようです。当然そうですよね。

メタな操作をしたくなったときに使えるのかもしれません。

おわりに

今日は昨日に続き Observable のユーティリティを見ました。知っておくと便利な場面があるかもしれませんね。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
4
Help us understand the problem. What are the problem?