この記事は bouzuya's RxJS Advent Calendar 2015 の 16 日目かつ RxJS Advent Calendar 2015 の 16 日目です。
はじめに
今日は Subject とその種類を見ていきます。
詳細は RxJS 4.0.7 を参照してください。
サンプルコードは以下のコマンドで試しています。
$ npm i rx babel-cli babel-preset-es2015
$ $(npm bin)/babel --presets es2015 index.js -o out.js && node out.js
Subject
Subject とは
Observer としても Observable としても動くクラスです。
任意のタイミングでの onNext
Observable.from などは subscribe されてはじめて動作します。Subject の場合は subscribe の有無に関わらず、任意のタイミングで Subject.prototype.onNext (Observer.prototype.onNext) により値を流すことができます。
subscribe の有無に関わらず値が流れる挙動などを指して Cold とか Hot とか呼ぶはずなのですが、ぼくは分かりづらいと感じるのであえて避けていきます。
1 Observable N Observer
昨日の FromObservable などでは 1 Observable 1 Observer になります。Subject では 1 Observable N Observer になります。
あるいは FromObservable などでは subscribe ごとに値を流しはじめますが、Subject では subscribe は observer を登録して onNext ごとに値を流すと言えばいいのかもしれません。
N Observer になるのが分かりやすいのが Subject.prototype.hasObservers というメソッドです。 Observer が登録されているかが分かります。
それぞれの subscribe 実装のイメージは次の通りです。
class FromObservable {
subscribe(observer) {
this.array.forEach(value => observer.onNext(value));
}
}
class Subject {
subscribe(observer) {
this.observers.push(observer);
}
onNext(value) {
this.observers.forEach(observer => observer.onNext(value));
}
hasObservers() {
return this.observers.length > 0;
}
}
Subject の種類
いくつかの種類があり、挙動がすこしずつ異なります。
Subject
onNext に渡されたデータをそのまま observers に流します。
import { Subject } from 'rx';
const subject = new Subject();
subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);
console.log('onNext(100)');
subject.onNext(100);
subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);
console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();
// onNext(100)
// 1 onNext: 100
// onNext(200)
// 1 onNext: 200
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted
subscribe のタイミング次第で値の取りこぼしがでてきます。Observable.from では subscribe で値を流しはじめるのですが、Subject では無関係に onNext で値が流れます。
(個人的には厳密には Observable.from は Push とは言えないと思えないのですが、どうなのでしょう……?)
AsyncSubject
AsyncSubject は最後に流れた値を保持しており、onCompleted でその値を流します。subscribe タイミングですでに onCompleted な場合はその値を流します。
import { AsyncSubject } from 'rx';
const subject = new AsyncSubject();
subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);
console.log('onNext(100)');
subject.onNext(100);
subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);
console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();
// onNext(100)
// onNext(200)
// onCompleted()
// 1 onNext: 200
// 1 onCompleted
// 2 onNext: 200
// 2 onCompleted
onCompleted を待ちたい場合に良いかもしれません。
BehaviorSubject
初期値または直近の値を保持しており subscribe および onNext のタイミングでその値を流します。
import { BehaviorSubject } from 'rx';
const subject = new BehaviorSubject(500);
console.log('subscribe 1');
subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);
console.log('onNext(100)');
subject.onNext(100);
console.log('subscribe 2');
subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);
console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();
// subscribe 1
// 1 onNext: 500
// onNext(100)
// 1 onNext: 100
// subscribe 2
// 2 onNext: 100
// onNext(200)
// 1 onNext: 200
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted
ReplaySubject
指定された個数だけ値をバッファにためこんで subscribe タイミングでそれらの値を流します。
import { ReplaySubject } from 'rx';
const subject = new ReplaySubject();
console.log('subscribe 1');
subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);
console.log('onNext(100)');
subject.onNext(100);
console.log('onNext(200)');
subject.onNext(200);
console.log('subscribe 2');
subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);
console.log('onCompleted()');
subject.onCompleted();
// subscribe 1
// onNext(100)
// 1 onNext: 100
// onNext(200)
// 1 onNext: 200
// subscribe 2
// 2 onNext: 100
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted
bufferSize を 1 にすれば、初期値指定のない BehaviorSubject のようになります。
おわりに
今日は Subject やその種類について確認しました。
明日以降に ConnectableObservable や Cold / Hot 変換関連のメソッドの実装を読むための下準備です。