この記事は bouzuya's RxJS Advent Calendar 2015 の 16 日目かつ RxJS Advent Calendar 2015 の 16 日目です。
はじめに
今日は Subject
とその種類を見ていきます。
詳細は RxJS 4.0.7 を参照してください。
サンプルコードは以下のコマンドで試しています。
$ npm i rx babel-cli babel-preset-es2015
$ $(npm bin)/babel --presets es2015 index.js -o out.js && node out.js
Subject
Subject とは
Observer
としても Observable
としても動くクラスです。
任意のタイミングでの onNext
Observable.from
などは subscribe
されてはじめて動作します。Subject
の場合は subscribe
の有無に関わらず、任意のタイミングで Subject.prototype.onNext
(Observer.prototype.onNext
) により値を流すことができます。
subscribe
の有無に関わらず値が流れる挙動などを指して Cold とか Hot とか呼ぶはずなのですが、ぼくは分かりづらいと感じるのであえて避けていきます。
1 Observable N Observer
昨日の FromObservable
などでは 1 Observable 1 Observer になります。Subject
では 1 Observable N Observer になります。
あるいは FromObservable
などでは subscribe
ごとに値を流しはじめますが、Subject
では subscribe
は observer
を登録して onNext
ごとに値を流すと言えばいいのかもしれません。
N Observer になるのが分かりやすいのが Subject.prototype.hasObservers
というメソッドです。 Observer
が登録されているかが分かります。
それぞれの subscribe
実装のイメージは次の通りです。
class FromObservable {
subscribe(observer) {
this.array.forEach(value => observer.onNext(value));
}
}
class Subject {
subscribe(observer) {
this.observers.push(observer);
}
onNext(value) {
this.observers.forEach(observer => observer.onNext(value));
}
hasObservers() {
return this.observers.length > 0;
}
}
Subject の種類
いくつかの種類があり、挙動がすこしずつ異なります。
Subject
onNext
に渡されたデータをそのまま observers
に流します。
import { Subject } from 'rx';
const subject = new Subject();
subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);
console.log('onNext(100)');
subject.onNext(100);
subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);
console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();
// onNext(100)
// 1 onNext: 100
// onNext(200)
// 1 onNext: 200
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted
subscribe
のタイミング次第で値の取りこぼしがでてきます。Observable.from
では subscribe
で値を流しはじめるのですが、Subject
では無関係に onNext
で値が流れます。
(個人的には厳密には Observable.from
は Push とは言えないと思えないのですが、どうなのでしょう……?)
AsyncSubject
AsyncSubject
は最後に流れた値を保持しており、onCompleted
でその値を流します。subscribe
タイミングですでに onCompleted
な場合はその値を流します。
import { AsyncSubject } from 'rx';
const subject = new AsyncSubject();
subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);
console.log('onNext(100)');
subject.onNext(100);
subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);
console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();
// onNext(100)
// onNext(200)
// onCompleted()
// 1 onNext: 200
// 1 onCompleted
// 2 onNext: 200
// 2 onCompleted
onCompleted
を待ちたい場合に良いかもしれません。
BehaviorSubject
初期値または直近の値を保持しており subscribe
および onNext
のタイミングでその値を流します。
import { BehaviorSubject } from 'rx';
const subject = new BehaviorSubject(500);
console.log('subscribe 1');
subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);
console.log('onNext(100)');
subject.onNext(100);
console.log('subscribe 2');
subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);
console.log('onNext(200)');
subject.onNext(200);
console.log('onCompleted()');
subject.onCompleted();
// subscribe 1
// 1 onNext: 500
// onNext(100)
// 1 onNext: 100
// subscribe 2
// 2 onNext: 100
// onNext(200)
// 1 onNext: 200
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted
ReplaySubject
指定された個数だけ値をバッファにためこんで subscribe
タイミングでそれらの値を流します。
import { ReplaySubject } from 'rx';
const subject = new ReplaySubject();
console.log('subscribe 1');
subject
.subscribe(
value => console.log(`1 onNext: ${value}`),
error => console.log(`1 onError: ${error}`),
() => console.log('1 onCompleted')
);
console.log('onNext(100)');
subject.onNext(100);
console.log('onNext(200)');
subject.onNext(200);
console.log('subscribe 2');
subject
.subscribe(
value => console.log(`2 onNext: ${value}`),
error => console.log(`2 onError: ${error}`),
() => console.log('2 onCompleted')
);
console.log('onCompleted()');
subject.onCompleted();
// subscribe 1
// onNext(100)
// 1 onNext: 100
// onNext(200)
// 1 onNext: 200
// subscribe 2
// 2 onNext: 100
// 2 onNext: 200
// onCompleted()
// 1 onCompleted
// 2 onCompleted
bufferSize
を 1
にすれば、初期値指定のない BehaviorSubject
のようになります。
おわりに
今日は Subject
やその種類について確認しました。
明日以降に ConnectableObservable
や Cold / Hot 変換関連のメソッドの実装を読むための下準備です。