0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

CODIANZ流 Rx 解説・TS編 #4: mergeMap オペレータ

Last updated at Posted at 2021-03-15

本章の目的

Rx でアプリを作っていると一番使われるであろう mergeMap について説明します。
また、前章の mapmergeMap の違いについてイメージしていただこうと思います。

mergeMap オペレータ

Rx の本家では FlatMap としています。
Rx では、ポーティングされた言語によって名称が異なることがありますが、あまり気にしないようにしましょう。 1

さて、この mergeMap はどんなオペレータかと言うと、 observable で発行された値を使って新たな observable を生成して、後続の subscribe はその 新たな observable を観測することになります。
と、言われてもピンとこないんじゃないかな?

具体的には、 observable で発行された値を使って 2 、非同期処理を呼び出す時に使います。

言葉よりもコード見た方が早いかもですね。

mergeMap オペレータの実装

class Observable<T> {
  private m_subscribeFn: (subscriber: Subscriber<T>) => void;
  public constructor(subscribe:  (subscriber: Subscriber<T>) => void){
    this.m_subscribeFn = subscribe;
  }
  public subscribe(s: Subscriber<T>): void{
    this.m_subscribeFn(s);
  }
  /* 今回追加実装した mergeMap オペレータ */
  public mergeMap<R>(fn: (value: T) => Observable<R>): Observable<R> {
    return new Observable<R>((subscriber) => {
      this.subscribe({
        next: (r) => {
          fn(r).subscribe({
            next: (r2) => {
              if(subscriber.next) subscriber.next(r2);
            },
            error: (err) => {
              if(subscriber.error) subscriber.error(err);
            },
            complete: () => {
              if(subscriber.complete) subscriber.complete();
            }
          })
        },
        error: (err) => {
          if(subscriber.error) subscriber.error(err);
        },
        complete: () => {
          if(subscriber.complete) subscriber.complete();
        }
      });
    });
  }
}

これが、 mergeMap の実装になります。 3
map よりも複雑になりましたが、自分自身を subscribe するところまでは全く同じです。
比較のために前章の map の実装部を引用します。

mapの実装部
  /* 今回追加実装した map オペレータ */
  public map<R>(fn: (value: T) => R): Observable<R> {
    return new Observable<R>((subscriber) => {
      this.m_subscribeFn({
        next: (r) => {
          if(subscriber.next) subscriber.next(fn(r));
        },
        error: (err) => {
          if(subscriber.error) subscriber.error(err);
        },
        complete: () => {
          if(subscriber.complete) subscriber.complete();
        }
      });
    });
  }

まず mergeMapmap の引数 fn に着目しましょう。

mapfn(value: T) => R となっていて、 T 型の値を受け取り R 型の値を返却する関数でした。
これに対して、 mergeMapfn(value: T) => Observable<R> となっていて、 T 型の値を受け取り R 型を発行する observable を返却する関数になります。

次に、 自分自身を subscribe して観測された値をどうするかに着目しましょう。

map は、観測された値 rfn の引数にして、 fn から返却された値を next で発行します。
これに対して、 mergeMap は、観測された値 rfn の引数にして、 fn から返却された observablesubscribe し、そこで観測された値を next で発行します。

どうですか? 違いが分かりますか?
慣れないと何が何だかわからないと思いますので、納得できるまで、何度か繰り返し比較してください。

ちなみに、気付いた方もいると思いますが、 complete が2箇所登場していて、変ですよね? これは後に complete についてフォーカスして説明します。というのも、 complete というのが、実は Rx を使う上で極めて重要な「気にするポイント」なのですが、ここで混ぜて説明すると複雑になるために、今回はイメージ的なコードになっています。

実装した mergeMap を動かしてみる

type Subscriber<T> = {
  next?: (value: T) => void;
  error?: (error: Error) => void;
  complete?: () => void;
}

class Observable<T> {
  private m_subscribeFn: (subscriber: Subscriber<T>) => void;
  public constructor(subscribe:  (subscriber: Subscriber<T>) => void){
    this.m_subscribeFn = subscribe;
  }
  public subscribe(s: Subscriber<T>): void{
    this.m_subscribeFn(s);
  }
  /* 今回追加実装した mergeMap オペレータ */
  public mergeMap<R>(fn: (value: T) => Observable<R>): Observable<R> {
    return new Observable<R>((subscriber) => {
      this.subscribe({
        next: (r) => {
          fn(r).subscribe({
            next: (r2) => {
              if(subscriber.next) subscriber.next(r2);
            },
            error: (err) => {
              if(subscriber.error) subscriber.error(err);
            },
            complete: () => {
              if(subscriber.complete) subscriber.complete();
            }
          })
        },
        error: (err) => {
          if(subscriber.error) subscriber.error(err);
        },
        complete: () => {
          if(subscriber.complete) subscriber.complete();
        }
      });
    });
  }
}

/* ベタな非同期関数 */
function asyncFn(a: number, callback: (r: number) => void){
  setTimeout(() => {
    callback(a * 2);
  }, 1000);
}

/* 非同期関数を observable に変換する関数 */
function rxAsyncFn(a: number): Observable<number> {
  return new Observable<number>((subscriber) => {
    asyncFn(a, (r)=>{
      if(subscriber.next) subscriber.next(r);
      if(subscriber.complete) subscriber.complete();
    });
  });
}

/* メイン処理 */
rxAsyncFn(1)
.mergeMap((r)=>{
  return rxAsyncFn(r + 100);
})
.subscribe({
  next: (r) => {
    console.log(r);
  },
  complete: () => {
    console.log("complete");
  }
});
出力
complete
204
complete

あれれ? complete が2回観測されましたね。
実際の rxjs では complete204 が発行された後に1回のみになります。
今回は、 mergeMapfn によって生成された observablesubscribe するというイメージをしていただきたくてシンプルなコードにまとめたのですが、実際の mergeMap はもう少し複雑な実装がされています。
今のところ、 observable が発行する値について話しを進めていますが、後に errorcomplete にフォーカスを当ててそこで説明をします。特に completeRx を使う上で極めて重要な「気にするポイント」です。

ある程度ちゃんと ```observable``` を書こうとするとこんな感じになります
type SubscriberLike<T> = {
  next?: (value: T) => void;
  error?: (error: Error) => void;
  complete?: () => void;
}

type Subscriber<T> = Required<SubscriberLike<T>>;

interface SubscriptionLike {
  unsubscribe(): void;
  readonly Closed: boolean;
}

type SubscriptionStatus = "next" | "error" | "complete";

class CascadeSubscription implements SubscriptionLike {
  protected m_status: SubscriptionStatus = "next";
  protected m_closed = false;
  protected m_superSubscriptions: SubscriptionLike[] = [];
  public get Closed() { return this.m_closed; }
  public unsubscribe() {
    this.m_closed = true;
    this.m_superSubscriptions.forEach((x) => x.unsubscribe());
  }
  public add(superSubscription: SubscriptionLike){
    this.m_superSubscriptions.push(superSubscription);
  }
}

class InternalSubscription<T> extends CascadeSubscription implements Subscriber<T> {
  private m_subscriber: SubscriberLike<T>;

  constructor(subscriber: SubscriberLike<T>){
    super();
    this.m_subscriber = subscriber;
  }

  public next(value: T) {
    if(this.Closed) return;
    if(this.m_status == "next"){
      if(this.m_subscriber.next) this.m_subscriber.next(value);
    }
  }

  public error(error: Error) {
    if(this.Closed) return;
    if(this.m_status == "next"){
      this.m_status = "error";
      if(this.m_subscriber.error) this.m_subscriber.error(error);
    }
  }

  public complete() {
    if(this.Closed) return;
    if(this.m_status == "next"){
      this.m_status = "complete";
      if(this.m_subscriber.complete) this.m_subscriber.complete();
    }
  }
}

class Observable<T> {
  private m_subscribeFn: (subscriber: Subscriber<T>) => void | SubscriptionLike;

  public constructor(subscribe: (subscriber: Subscriber<T>) => void | SubscriptionLike){
    this.m_subscribeFn = subscribe;
  }

  public subscribe(s: SubscriberLike<T>): SubscriptionLike {
    const subscription = new InternalSubscription<T>(s);
    const innerSubscription = this.m_subscribeFn(subscription);
    if(innerSubscription){
      subscription.add(innerSubscription);
    }
    return subscription;
  }

  public map<R>(fn: (value: T) => R): Observable<R> {
    return new Observable<R>((subscriber) => {
      const subscription = new InternalSubscription<R>(subscriber);
      const innnerSubscription = this.subscribe({
        next: (r) => {
          subscription.next(fn(r));
        },
        error: (error) => {
          subscription.error(error);
        },
        complete: () => {
          subscription.complete();
        }
      });
      subscription.add(innnerSubscription);
      return subscription;
    });
  }

  public mergeMap<R>(fn: (value: T) => Observable<R>): Observable<R> {
    return new Observable<R>((subscriber) => {
      const subscription = new InternalSubscription<R>(subscriber);
      let fnCount = 0;
      let bComplete = false;
      const innnerSubscription = this.subscribe({
        next: (r) => {
          fnCount++;
          const fnSubscription = fn(r).subscribe({
            next: (value) => {
              subscription.next(value);
            },
            error: (error) => {
              subscription.error(error);
            },
            complete: () => {
              fnCount--;
              if(bComplete && fnCount == 0){
                subscription.complete();
              }
            }
          });
          subscription.add(fnSubscription);
        },
        error: (error) => {
          subscription.error(error);
        },
        complete: () => {
          bComplete = true;
          if(fnCount == 0){
            subscription.complete();
          }
        }
      });
      subscription.add(innnerSubscription);
      return subscription;
    });
  }
}

まとめ

mergeMapmap の違いが理解できましたか?
この mergeMap は非同期処理を連鎖的に呼び出す際に頻出のオペレータです。
Rx を使って複雑なコーディングをしている中で、このような内部的な構造や動きなんて意識しなくなります(これが Rx の良いところでもあります)が、何か壁にぶつかったときには、オペレータがどう振舞うかをイメージすると適切な対応ができるようになると思います。
あ、そうそう、ここまで読んでいただくと 序章の「observableを連鎖させる」が何となく見えてくるんじゃないでしょうか。

次回は、 rxjspipe について説明します。

  1. ちなみに、 rxjs では、 mergeMap に似たもので switchMapconcatMap などがありますが挙動が微妙に違います。特に switchMapmergeMap に似ていますが、非同期処理時では挙動が大きく異なります。この違いを理解せず「どっちでも良いじゃん」的に switchMap を使われると後からメンテナーが見て「意図が伝わらない」というか無駄な心配をしないといけないので( switchMap は連鎖した observalbe が値発行中でも上流の値発行により、以前の observable は後段に値発行しないため処理が抜けます。これを意図しているか悩むという意味です。)、適切なオペレータを使用するよう心掛けてください。

  2. 値の発行が非同期処理を呼び出すキッカケとする場合、値を使わないケースもあります。(案外多いです)

  3. rxjs の実装はもっとケアすることが多く複雑です

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?