ReactiveExtensions
RxJS
Rx
RxJSDay 16

RxJS の Subject とその種類

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 の 16 日目かつ RxJS Advent Calendar 2015 の 16 日目です。


はじめに

今日は Subject とその種類を見ていきます。

詳細は RxJS 4.0.7 を参照してください。

サンプルコードは以下のコマンドで試しています。

$ npm i rx babel-cli babel-preset-es2015

$ $(npm bin)/babel --presets es2015 index.js -o out.js && node out.js


Subject


Subject とは

Observer としても Observable としても動くクラスです。


任意のタイミングでの onNext

Observable.from などは subscribe されてはじめて動作します。Subject の場合は subscribe の有無に関わらず、任意のタイミングで Subject.prototype.onNext (Observer.prototype.onNext) により値を流すことができます。

subscribe の有無に関わらず値が流れる挙動などを指して Cold とか Hot とか呼ぶはずなのですが、ぼくは分かりづらいと感じるのであえて避けていきます。


1 Observable N Observer

昨日の FromObservable などでは 1 Observable 1 Observer になります。Subject では 1 Observable N Observer になります。

あるいは FromObservable などでは subscribe ごとに値を流しはじめますが、Subject では subscribeobserver を登録して onNext ごとに値を流すと言えばいいのかもしれません。

N Observer になるのが分かりやすいのが Subject.prototype.hasObservers というメソッドです。 Observer が登録されているかが分かります。

それぞれの subscribe 実装のイメージは次の通りです。

class FromObservable {

subscribe(observer) {
this.array.forEach(value => observer.onNext(value));
}
}

class Subject {

subscribe(observer) {
this.observers.push(observer);
}

onNext(value) {
this.observers.forEach(observer => observer.onNext(value));
}

hasObservers() {
return this.observers.length > 0;
}
}


Subject の種類

いくつかの種類があり、挙動がすこしずつ異なります。


Subject

onNext に渡されたデータをそのまま observers に流します。

import { Subject } from 'rx';

const subject = new Subject();

subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);

console.log('onNext(100)');
subject.onNext(100);

subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);

console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();

// onNext(100)
// 1 onNext: 100
// onNext(200)
// 1 onNext: 200
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted

subscribe のタイミング次第で値の取りこぼしがでてきます。Observable.from では subscribe で値を流しはじめるのですが、Subject では無関係に onNext で値が流れます。

(個人的には厳密には Observable.from は Push とは言えないと思えないのですが、どうなのでしょう……?)


AsyncSubject

AsyncSubject は最後に流れた値を保持しており、onCompleted でその値を流します。subscribe タイミングですでに onCompleted な場合はその値を流します。

import { AsyncSubject } from 'rx';

const subject = new AsyncSubject();

subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);

console.log('onNext(100)');
subject.onNext(100);

subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);

console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();

// onNext(100)
// onNext(200)
// onCompleted()
// 1 onNext: 200
// 1 onCompleted
// 2 onNext: 200
// 2 onCompleted

onCompleted を待ちたい場合に良いかもしれません。


BehaviorSubject

初期値または直近の値を保持しており subscribe および onNext のタイミングでその値を流します。

import { BehaviorSubject } from 'rx';

const subject = new BehaviorSubject(500);

console.log('subscribe 1');
subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);

console.log('onNext(100)');
subject.onNext(100);

console.log('subscribe 2');
subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);

console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();

// subscribe 1
// 1 onNext: 500
// onNext(100)
// 1 onNext: 100
// subscribe 2
// 2 onNext: 100
// onNext(200)
// 1 onNext: 200
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted


ReplaySubject

指定された個数だけ値をバッファにためこんで subscribe タイミングでそれらの値を流します。

import { ReplaySubject } from 'rx';

const subject = new ReplaySubject();

console.log('subscribe 1');
subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);

console.log('onNext(100)');
subject.onNext(100);
console.log('onNext(200)');
subject.onNext(200);

console.log('subscribe 2');
subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);

console.log('onCompleted()');
subject.onCompleted();

// subscribe 1
// onNext(100)
// 1 onNext: 100
// onNext(200)
// 1 onNext: 200
// subscribe 2
// 2 onNext: 100
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted

bufferSize1 にすれば、初期値指定のない BehaviorSubject のようになります。


おわりに

今日は Subject やその種類について確認しました。

明日以降に ConnectableObservable や Cold / Hot 変換関連のメソッドの実装を読むための下準備です。