85
75

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RxJSAdvent Calendar 2015

Day 16

RxJS の Subject とその種類

Last updated at Posted at 2015-12-16

この記事は bouzuya's RxJS Advent Calendar 2015 の 16 日目かつ RxJS Advent Calendar 2015 の 16 日目です。

はじめに

今日は Subject とその種類を見ていきます。

詳細は RxJS 4.0.7 を参照してください。

サンプルコードは以下のコマンドで試しています。

$ npm i rx babel-cli babel-preset-es2015
$ $(npm bin)/babel --presets es2015 index.js -o out.js && node out.js

Subject

Subject とは

Observer としても Observable としても動くクラスです。

任意のタイミングでの onNext

Observable.from などは subscribe されてはじめて動作します。Subject の場合は subscribe の有無に関わらず、任意のタイミングで Subject.prototype.onNext (Observer.prototype.onNext) により値を流すことができます。

subscribe の有無に関わらず値が流れる挙動などを指して Cold とか Hot とか呼ぶはずなのですが、ぼくは分かりづらいと感じるのであえて避けていきます。

1 Observable N Observer

昨日の FromObservable などでは 1 Observable 1 Observer になります。Subject では 1 Observable N Observer になります。

あるいは FromObservable などでは subscribe ごとに値を流しはじめますが、Subject では subscribeobserver を登録して onNext ごとに値を流すと言えばいいのかもしれません。

N Observer になるのが分かりやすいのが Subject.prototype.hasObservers というメソッドです。 Observer が登録されているかが分かります。

それぞれの subscribe 実装のイメージは次の通りです。

class FromObservable {
  subscribe(observer) {
    this.array.forEach(value => observer.onNext(value));
  }
}
class Subject {
  subscribe(observer) {
    this.observers.push(observer);
  }
  
  onNext(value) {
    this.observers.forEach(observer => observer.onNext(value));
  }
  
  hasObservers() {
    return this.observers.length > 0;
  }
}

Subject の種類

いくつかの種類があり、挙動がすこしずつ異なります。

Subject

onNext に渡されたデータをそのまま observers に流します。

import { Subject } from 'rx';

const subject = new Subject();

subject
  .subscribe(
    value => console.log(`1 onNext: ${value}`),
    error => console.log(`1 onError: ${error}`),
    () => console.log('1 onCompleted')
  );

console.log('onNext(100)');
subject.onNext(100);

subject
  .subscribe(
    value => console.log(`2 onNext: ${value}`),
    error => console.log(`2 onError: ${error}`),
    () => console.log('2 onCompleted')
  );

console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();

// onNext(100)
// 1 onNext: 100
// onNext(200)
// 1 onNext: 200
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted

subscribe のタイミング次第で値の取りこぼしがでてきます。Observable.from では subscribe で値を流しはじめるのですが、Subject では無関係に onNext で値が流れます。

(個人的には厳密には Observable.from は Push とは言えないと思えないのですが、どうなのでしょう……?)

AsyncSubject

AsyncSubject は最後に流れた値を保持しており、onCompleted でその値を流します。subscribe タイミングですでに onCompleted な場合はその値を流します。

import { AsyncSubject } from 'rx';

const subject = new AsyncSubject();

subject
  .subscribe(
    value => console.log(`1 onNext: ${value}`),
    error => console.log(`1 onError: ${error}`),
    () => console.log('1 onCompleted')
  );

console.log('onNext(100)');
subject.onNext(100);

subject
  .subscribe(
    value => console.log(`2 onNext: ${value}`),
    error => console.log(`2 onError: ${error}`),
    () => console.log('2 onCompleted')
  );

console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();

// onNext(100)
// onNext(200)
// onCompleted()
// 1 onNext: 200
// 1 onCompleted
// 2 onNext: 200
// 2 onCompleted

onCompleted を待ちたい場合に良いかもしれません。

BehaviorSubject

初期値または直近の値を保持しており subscribe および onNext のタイミングでその値を流します。

import { BehaviorSubject } from 'rx';

const subject = new BehaviorSubject(500);

console.log('subscribe 1');
subject
  .subscribe(
    value => console.log(`1 onNext: ${value}`),
    error => console.log(`1 onError: ${error}`),
    () => console.log('1 onCompleted')
  );

console.log('onNext(100)');
subject.onNext(100);

console.log('subscribe 2');
subject
  .subscribe(
    value => console.log(`2 onNext: ${value}`),
    error => console.log(`2 onError: ${error}`),
    () => console.log('2 onCompleted')
  );

console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();

// subscribe 1
// 1 onNext: 500
// onNext(100)
// 1 onNext: 100
// subscribe 2
// 2 onNext: 100
// onNext(200)
// 1 onNext: 200
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted

ReplaySubject

指定された個数だけ値をバッファにためこんで subscribe タイミングでそれらの値を流します。

import { ReplaySubject } from 'rx';

const subject = new ReplaySubject();

console.log('subscribe 1');
subject
  .subscribe(
    value => console.log(`1 onNext: ${value}`),
    error => console.log(`1 onError: ${error}`),
    () => console.log('1 onCompleted')
  );

console.log('onNext(100)');
subject.onNext(100);
console.log('onNext(200)');
subject.onNext(200);

console.log('subscribe 2');
subject
  .subscribe(
    value => console.log(`2 onNext: ${value}`),
    error => console.log(`2 onError: ${error}`),
    () => console.log('2 onCompleted')
  );

console.log('onCompleted()');
subject.onCompleted();

// subscribe 1
// onNext(100)
// 1 onNext: 100
// onNext(200)
// 1 onNext: 200
// subscribe 2
// 2 onNext: 100
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted

bufferSize1 にすれば、初期値指定のない BehaviorSubject のようになります。

おわりに

今日は Subject やその種類について確認しました。

明日以降に ConnectableObservable や Cold / Hot 変換関連のメソッドの実装を読むための下準備です。

85
75
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
85
75

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?