本章の目的
observable
の骨格を使って、 Rx
の代表的なオペレータ map
を紐解きます。
map オペレータ
map
オペレータは、値を変換する際に使用します。
実際、業務アプリを作っていると頻出のオペレータです。
例えば、 observable
が文字列を発行して、それを JSON オブジェクトとして処理したい時に、この map
を使います。
と、ここで rxjs
のサンプルを書くと pipe
というのがどうにもノイズになるので、 pipe
の紹介をするまでは、 observable
の骨格を使って説明します。
で、序章で使っていたサンプルの非同期関数は1秒後に値を倍にするというものでした。
この値に100を加算する map を作ってみようと思います。
ゴールはこんな感じです。
rxAsyncFn(1)
.map((x)=>{
return x + 100;
})
.subscribe({
next: (r) => {
console.log(r); /* 102 */
},
complete: () => {
console.log("complete");
}
});
map オペレータの実装
早速、 map
オペレータを実装を見てみましょう。
type Subscriber<T> = {
next?: (value: T) => void;
error?: (error: Error) => void;
complete?: () => void;
}
class Observable<T> {
private m_subscribeFn: (subscriber: Subscriber<T>) => void;
public constructor(subscribe: (subscriber: Subscriber<T>) => void){
this.m_subscribeFn = subscribe;
}
public subscribe(s: Subscriber<T>): void{
this.m_subscribeFn(s);
}
/* 今回追加実装した map オペレータ */
public map<R>(fn: (value: T) => R): Observable<R> {
return new Observable<R>((subscriber) => {
this.m_subscribeFn({
next: (r) => {
if(subscriber.next) subscriber.next(fn(r));
},
error: (err) => {
if(subscriber.error) subscriber.error(err);
},
complete: () => {
if(subscriber.complete) subscriber.complete();
}
});
});
}
}
ちょっと、複雑かな?
でも、 return new Observable
って何だか見覚えありませんか?
これ、非同期関数を observable
にする時に書いたコードですよね。
function rxAsyncFn(a: number): Observable<number> {
return new Observable<number>((subscriber) => {
asyncFn(a, (r)=>{
if(subscriber.next) subscriber.next(r);
if(subscriber.complete) subscriber.complete();
});
});
}
それと同じで、 map
オペレータ、つまり、 map()
関数はオブザーバブルを返却するのです。
つまり、 Rx
のオペレータは全て observable
を返却します。
Rx
の世界は全てが observable
なのです。
と、刷り込みはこのくらいにして。。。
この map()
は関数 fn
を引数に取ります。この関数 fn
は、 observable
から発行された値を加工する関数です。
rxAsyncFn(1)
.map((x)=>{
return x + 100;
})
では、改めて observable
の map()
を見てみると
/* 今回追加実装した map オペレータ */
public map<R>(fn: (value: T) => R): Observable<R> {
return new Observable<R>((subscriber) => {
this.subscribe({
next: (r) => {
if(subscriber.next) subscriber.next(fn(r));
},
error: (err) => {
if(subscriber.error) subscriber.error(err);
},
complete: () => {
if(subscriber.complete) subscriber.complete();
}
});
});
}
observable
自身を subscribe
しているのが分かりますか?
そうなんです。
この map()
は自分自身を観測して、その観測して、観測された値を fn
で変換して、その値を発行するんです。
何だか、バケツリレーやっている感じで面白くないですか?
で、この「自分自身を観測」というのは、全てのオペレータで共通します。
そして、観測したら「何をする」かが違うのです。
実装した map を動かしてみる
type Subscriber<T> = {
next?: (value: T) => void;
error?: (error: Error) => void;
complete?: () => void;
}
class Observable<T> {
private m_subscribeFn: (subscriber: Subscriber<T>) => void;
public constructor(subscribe: (subscriber: Subscriber<T>) => void){
this.m_subscribeFn = subscribe;
}
public subscribe(s: Subscriber<T>): void{
this.m_subscribeFn(s);
}
/* 今回追加実装した map オペレータ */
public map<R>(fn: (value: T) => R): Observable<R> {
return new Observable<R>((subscriber) => {
this.subscribe({
next: (r) => {
if(subscriber.next) subscriber.next(fn(r));
},
error: (err) => {
if(subscriber.error) subscriber.error(err);
},
complete: () => {
if(subscriber.complete) subscriber.complete();
}
});
});
}
}
/* ベタな非同期関数 */
function asyncFn(a: number, callback: (r: number) => void){
setTimeout(() => {
callback(a * 2);
}, 1000);
}
/* 非同期関数を observable に変換する関数 */
function rxAsyncFn(a: number): Observable<number> {
return new Observable<number>((subscriber) => {
asyncFn(a, (r)=>{
if(subscriber.next) subscriber.next(r);
if(subscriber.complete) subscriber.complete();
});
});
}
/* メイン処理 */
rxAsyncFn(1)
.map((x)=>{
return x + 100;
})
.subscribe({
next: (r) => {
console.log(r);
},
complete: () => {
console.log("complete");
}
});
102
complete
まとめ
本章では、一番基本の map
オペレータについて深堀してみました。
Rx
初心者のコードを見ると、細かく subscribe
をして処理をつなげる的なコードを書いているのを見かけたりするのですが、この辺りの仕組みが分かると、一連の処理は複数の observable
とオペレータの連鎖で構成されて、1つの subscribe
で完結することができる、と、いうかそうあるべきじゃないかと思う訳です。なぜなら、その subscribe
して処理をつなげるというのは、 Rx
が提供している数多のオペレータで実現できるからです。 1
さて、次は mergeMap
です。
rxcpp
では flat_map
なのですが、数年前、「自分は千代田区で一番 flat_map
をキーボードタイプしているんじゃないか?」 2 と思うくらい頻出 3 のオペレータです。