本章の目的
本章では、 observable
の骨格を、実際にコードを作ってみてイメージしてもらおうと思います。
そして、 observable
骨格を知ることで、後に登場する map
や mergeMap
など、各種オペレータや、スケジューラ、その他 Rx
を理解して使うためには必須というか、近道なんじゃないかと思う訳です。
そんなに、構える必要はありません。
ちょっと、パズルを解くイメージで考えてみてください。
observable の骨格
序章にて observable
を紹介させていただきました。
そして、 Rx
の世界では全ては observable
でしたよね。
さて、この observable
って、中身がどうなっているか気になりません?
気にならないから、さっさとオペレータ教えろとか言われそうですが、少し我慢してくださいませ。
実は、この observable
が一体何者なのかを知ることは、様々なオペレータや挙動を理解する一番の近道だと思うのです。
というのも、これだけなんです。
type Subscriber<T> = {
next?: (value: T) => void;
error?: (error: Error) => void;
complete?: () => void;
}
class Observable<T> {
private m_subscribeFn: (subscriber: Subscriber<T>) => void;
public constructor(subscribe: (subscriber: Subscriber<T>) => void){
this.m_subscribeFn = subscribe;
}
public subscribe(s: Subscriber<T>): void{
this.m_subscribeFn(s);
}
}
observable
の骨格はこれだけです。 1
十数行で記述できるレベルです。
type Subscriber<T>
序章では observable
が3種類の発行をすると述べました。
- 値(next)
- エラー(error)
- 終了(complete)
「発行する」とは「関数を呼び出す」ことです。
つまり、この type Subscriber<T>
という型は、値発行、エラー発行、終了発行の際に呼び出す関数を入れておく器になります。( interface Subscriber<T>
で定義しても等価)
各プロパティ( next
, error
, complete
)はオプショナルなので、観測する時にその発行を観測したくなければ、 undefined にすれば良い訳です。 2
class Observable<T>
さて、いよいよ本丸の observable
です。
このクラス、よ~~~く見てください。
この子は m_subscribeFn
というプロパティを持っています。このプロパティは関数型ですね。
で、このプロパティは observable
のコンストラクタで初期化されます。
思い出しました?
序章で登場した rxAsyncFn()
の new Observable
の引数そのものです!
function rxAsyncFn(a: number): Observable<number> {
return new Observable<number>((subscriber) => {
asyncFn(a, (r)=>{
if(subscriber.next) subscriber.next(r);
if(subscriber.complete) subscriber.complete();
});
});
}
そして、その関数はいつ呼び出されるのでしょうか?
そう! 観測 subscribe
する時です。
ということで、この observable
には subscribe()
が定義されていますね。
見てみましょう。
public subscribe(s: Subscriber<T>): void{
this.m_subscribeFn(s);
}
お~、こんだけです!
簡単でしょ?
騙された感じですか?(苦笑)
observable の骨格を動かしてみる
ということで、実際に試してみましょう。
type Subscriber<T> = {
next?: (value: T) => void;
error?: (error: Error) => void;
complete?: () => void;
}
class Observable<T> {
private m_subscribeFn: (subscriber: Subscriber<T>) => void;
public constructor(subscribe: (subscriber: Subscriber<T>) => void){
this.m_subscribeFn = subscribe;
}
public subscribe(s: Subscriber<T>): void{
this.m_subscribeFn(s);
}
}
/* 非同期関数 */
function asyncFn(a: number, callback: (r: number) => void){
setTimeout(() => {
callback(a * 2);
}, 1000);
}
/* 非同期関数を observable にする関数 */
function rxAsyncFn(a: number): Observable<number> {
return new Observable<number>((subscriber) => {
asyncFn(a, (r)=>{
if(subscriber.next) subscriber.next(r);
if(subscriber.complete) subscriber.complete();
});
});
}
/* メインコード */
rxAsyncFn(1)
.subscribe({
next: (r) => {
console.log(r);
},
complete: () => {
console.log("complete");
}
});
序章との違いは、 observable
の実装が露わになっていることだけで、他は全く同じです。
ここで改めて rxAsyncFn()
で observable
が生成されて、 subscribe
して next
や complete
が呼び出されるのをイメージしてください。
ちなみに、私のイメージはこんな感じです。
まとめ
本章では、実際に observable
の骨格を作ってみて、 Rx
の世界でプログラムがどういう仕組みで動いているのかをイメージしていただければと思います。
Rx
というパラダイムには様々なオペレータやスケジューラなど機能が豊富で自由度も高いです。
しかし、これらの機能は、その挙動を理解して使用しないと後々「謎な挙動」に遭遇し、やっぱ Rx
は意味不明で難しすぎるとか、挫折の原因になっているように見られます。更に、同期プログラミングと比較するとデバッグが難しいです。 3
偉そうにこんな記事書いている私ですが、作ってみたものの「およよ?!」と思う挙動が出て悩むことが未だにあります。
そんなときは、このイメージに立ち戻って考えると、自ずと答えが出てくることが多いです。
次回は、この自作 observable
を使って、 map
と mergeMap
を追加してみて、その違いをイメージしていただこうかと思います。私の周りを見てみると Rx
で最初に躓くポイントが map
と mergeMap
の違いじゃないかと思うのですが、この observable
の骨格をイメージできれば、実は簡単だったりします。