本章の目的
Rx
でアプリを作っていると一番使われるであろう mergeMap
について説明します。
また、前章の map
と mergeMap
の違いについてイメージしていただこうと思います。
mergeMap オペレータ
Rx
の本家では FlatMap
としています。
Rx
では、ポーティングされた言語によって名称が異なることがありますが、あまり気にしないようにしましょう。 1
さて、この mergeMap
はどんなオペレータかと言うと、 observable
で発行された値を使って新たな observable
を生成して、後続の subscribe
はその 新たな observable
を観測することになります。
と、言われてもピンとこないんじゃないかな?
具体的には、 observable
で発行された値を使って 2 、非同期処理を呼び出す時に使います。
言葉よりもコード見た方が早いかもですね。
mergeMap オペレータの実装
class Observable<T> {
private m_subscribeFn: (subscriber: Subscriber<T>) => void;
public constructor(subscribe: (subscriber: Subscriber<T>) => void){
this.m_subscribeFn = subscribe;
}
public subscribe(s: Subscriber<T>): void{
this.m_subscribeFn(s);
}
/* 今回追加実装した mergeMap オペレータ */
public mergeMap<R>(fn: (value: T) => Observable<R>): Observable<R> {
return new Observable<R>((subscriber) => {
this.subscribe({
next: (r) => {
fn(r).subscribe({
next: (r2) => {
if(subscriber.next) subscriber.next(r2);
},
error: (err) => {
if(subscriber.error) subscriber.error(err);
},
complete: () => {
if(subscriber.complete) subscriber.complete();
}
})
},
error: (err) => {
if(subscriber.error) subscriber.error(err);
},
complete: () => {
if(subscriber.complete) subscriber.complete();
}
});
});
}
}
これが、 mergeMap
の実装になります。 3
map
よりも複雑になりましたが、自分自身を subscribe
するところまでは全く同じです。
比較のために前章の map
の実装部を引用します。
/* 今回追加実装した map オペレータ */
public map<R>(fn: (value: T) => R): Observable<R> {
return new Observable<R>((subscriber) => {
this.m_subscribeFn({
next: (r) => {
if(subscriber.next) subscriber.next(fn(r));
},
error: (err) => {
if(subscriber.error) subscriber.error(err);
},
complete: () => {
if(subscriber.complete) subscriber.complete();
}
});
});
}
まず mergeMap
と map
の引数 fn
に着目しましょう。
map
の fn
は (value: T) => R
となっていて、 T
型の値を受け取り R
型の値を返却する関数でした。
これに対して、 mergeMap
の fn
は (value: T) => Observable<R>
となっていて、 T
型の値を受け取り R
型を発行する observable
を返却する関数になります。
次に、 自分自身を subscribe
して観測された値をどうするかに着目しましょう。
map
は、観測された値 r
を fn
の引数にして、 fn
から返却された値を next
で発行します。
これに対して、 mergeMap
は、観測された値 r
を fn
の引数にして、 fn
から返却された observable
を subscribe
し、そこで観測された値を next
で発行します。
どうですか? 違いが分かりますか?
慣れないと何が何だかわからないと思いますので、納得できるまで、何度か繰り返し比較してください。
ちなみに、気付いた方もいると思いますが、 complete
が2箇所登場していて、変ですよね? これは後に complete
についてフォーカスして説明します。というのも、 complete
というのが、実は Rx
を使う上で極めて重要な「気にするポイント」なのですが、ここで混ぜて説明すると複雑になるために、今回はイメージ的なコードになっています。
実装した mergeMap を動かしてみる
type Subscriber<T> = {
next?: (value: T) => void;
error?: (error: Error) => void;
complete?: () => void;
}
class Observable<T> {
private m_subscribeFn: (subscriber: Subscriber<T>) => void;
public constructor(subscribe: (subscriber: Subscriber<T>) => void){
this.m_subscribeFn = subscribe;
}
public subscribe(s: Subscriber<T>): void{
this.m_subscribeFn(s);
}
/* 今回追加実装した mergeMap オペレータ */
public mergeMap<R>(fn: (value: T) => Observable<R>): Observable<R> {
return new Observable<R>((subscriber) => {
this.subscribe({
next: (r) => {
fn(r).subscribe({
next: (r2) => {
if(subscriber.next) subscriber.next(r2);
},
error: (err) => {
if(subscriber.error) subscriber.error(err);
},
complete: () => {
if(subscriber.complete) subscriber.complete();
}
})
},
error: (err) => {
if(subscriber.error) subscriber.error(err);
},
complete: () => {
if(subscriber.complete) subscriber.complete();
}
});
});
}
}
/* ベタな非同期関数 */
function asyncFn(a: number, callback: (r: number) => void){
setTimeout(() => {
callback(a * 2);
}, 1000);
}
/* 非同期関数を observable に変換する関数 */
function rxAsyncFn(a: number): Observable<number> {
return new Observable<number>((subscriber) => {
asyncFn(a, (r)=>{
if(subscriber.next) subscriber.next(r);
if(subscriber.complete) subscriber.complete();
});
});
}
/* メイン処理 */
rxAsyncFn(1)
.mergeMap((r)=>{
return rxAsyncFn(r + 100);
})
.subscribe({
next: (r) => {
console.log(r);
},
complete: () => {
console.log("complete");
}
});
complete
204
complete
あれれ? complete
が2回観測されましたね。
実際の rxjs
では complete
は 204
が発行された後に1回のみになります。
今回は、 mergeMap
が fn
によって生成された observable
を subscribe
するというイメージをしていただきたくてシンプルなコードにまとめたのですが、実際の mergeMap
はもう少し複雑な実装がされています。
今のところ、 observable
が発行する値について話しを進めていますが、後に error
と complete
にフォーカスを当ててそこで説明をします。特に complete
は Rx
を使う上で極めて重要な「気にするポイント」です。
ある程度ちゃんと ```observable``` を書こうとするとこんな感じになります
type SubscriberLike<T> = {
next?: (value: T) => void;
error?: (error: Error) => void;
complete?: () => void;
}
type Subscriber<T> = Required<SubscriberLike<T>>;
interface SubscriptionLike {
unsubscribe(): void;
readonly Closed: boolean;
}
type SubscriptionStatus = "next" | "error" | "complete";
class CascadeSubscription implements SubscriptionLike {
protected m_status: SubscriptionStatus = "next";
protected m_closed = false;
protected m_superSubscriptions: SubscriptionLike[] = [];
public get Closed() { return this.m_closed; }
public unsubscribe() {
this.m_closed = true;
this.m_superSubscriptions.forEach((x) => x.unsubscribe());
}
public add(superSubscription: SubscriptionLike){
this.m_superSubscriptions.push(superSubscription);
}
}
class InternalSubscription<T> extends CascadeSubscription implements Subscriber<T> {
private m_subscriber: SubscriberLike<T>;
constructor(subscriber: SubscriberLike<T>){
super();
this.m_subscriber = subscriber;
}
public next(value: T) {
if(this.Closed) return;
if(this.m_status == "next"){
if(this.m_subscriber.next) this.m_subscriber.next(value);
}
}
public error(error: Error) {
if(this.Closed) return;
if(this.m_status == "next"){
this.m_status = "error";
if(this.m_subscriber.error) this.m_subscriber.error(error);
}
}
public complete() {
if(this.Closed) return;
if(this.m_status == "next"){
this.m_status = "complete";
if(this.m_subscriber.complete) this.m_subscriber.complete();
}
}
}
class Observable<T> {
private m_subscribeFn: (subscriber: Subscriber<T>) => void | SubscriptionLike;
public constructor(subscribe: (subscriber: Subscriber<T>) => void | SubscriptionLike){
this.m_subscribeFn = subscribe;
}
public subscribe(s: SubscriberLike<T>): SubscriptionLike {
const subscription = new InternalSubscription<T>(s);
const innerSubscription = this.m_subscribeFn(subscription);
if(innerSubscription){
subscription.add(innerSubscription);
}
return subscription;
}
public map<R>(fn: (value: T) => R): Observable<R> {
return new Observable<R>((subscriber) => {
const subscription = new InternalSubscription<R>(subscriber);
const innnerSubscription = this.subscribe({
next: (r) => {
subscription.next(fn(r));
},
error: (error) => {
subscription.error(error);
},
complete: () => {
subscription.complete();
}
});
subscription.add(innnerSubscription);
return subscription;
});
}
public mergeMap<R>(fn: (value: T) => Observable<R>): Observable<R> {
return new Observable<R>((subscriber) => {
const subscription = new InternalSubscription<R>(subscriber);
let fnCount = 0;
let bComplete = false;
const innnerSubscription = this.subscribe({
next: (r) => {
fnCount++;
const fnSubscription = fn(r).subscribe({
next: (value) => {
subscription.next(value);
},
error: (error) => {
subscription.error(error);
},
complete: () => {
fnCount--;
if(bComplete && fnCount == 0){
subscription.complete();
}
}
});
subscription.add(fnSubscription);
},
error: (error) => {
subscription.error(error);
},
complete: () => {
bComplete = true;
if(fnCount == 0){
subscription.complete();
}
}
});
subscription.add(innnerSubscription);
return subscription;
});
}
}
まとめ
mergeMap
と map
の違いが理解できましたか?
この mergeMap
は非同期処理を連鎖的に呼び出す際に頻出のオペレータです。
Rx
を使って複雑なコーディングをしている中で、このような内部的な構造や動きなんて意識しなくなります(これが Rx
の良いところでもあります)が、何か壁にぶつかったときには、オペレータがどう振舞うかをイメージすると適切な対応ができるようになると思います。
あ、そうそう、ここまで読んでいただくと 序章の「observableを連鎖させる」が何となく見えてくるんじゃないでしょうか。
次回は、 rxjs
の pipe
について説明します。
-
ちなみに、
rxjs
では、mergeMap
に似たものでswitchMap
やconcatMap
などがありますが挙動が微妙に違います。特にswitchMap
はmergeMap
に似ていますが、非同期処理時では挙動が大きく異なります。この違いを理解せず「どっちでも良いじゃん」的にswitchMap
を使われると後からメンテナーが見て「意図が伝わらない」というか無駄な心配をしないといけないので(switchMap
は連鎖したobservalbe
が値発行中でも上流の値発行により、以前のobservable
は後段に値発行しないため処理が抜けます。これを意図しているか悩むという意味です。)、適切なオペレータを使用するよう心掛けてください。 ↩ -
値の発行が非同期処理を呼び出すキッカケとする場合、値を使わないケースもあります。(案外多いです) ↩
-
rxjs
の実装はもっとケアすることが多く複雑です ↩