0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

CODIANZ流 Rx 解説・TS編 #3: map オペレータ

Last updated at Posted at 2021-03-14

本章の目的

observable の骨格を使って、 Rx の代表的なオペレータ map を紐解きます。

map オペレータ

map オペレータは、値を変換する際に使用します。
実際、業務アプリを作っていると頻出のオペレータです。
例えば、 observable が文字列を発行して、それを JSON オブジェクトとして処理したい時に、この map を使います。

と、ここで rxjs のサンプルを書くと pipe というのがどうにもノイズになるので、 pipe の紹介をするまでは、 observable の骨格を使って説明します。

で、序章で使っていたサンプルの非同期関数は1秒後に値を倍にするというものでした。
この値に100を加算する map を作ってみようと思います。
ゴールはこんな感じです。

今回やってみること
rxAsyncFn(1)
.map((x)=>{
  return x + 100;
})
.subscribe({
  next: (r) => {
    console.log(r);  /* 102 */
  },
  complete: () => {
    console.log("complete");
  }
});

map オペレータの実装

早速、 map オペレータを実装を見てみましょう。

type Subscriber<T> = {
  next?: (value: T) => void;
  error?: (error: Error) => void;
  complete?: () => void;
}

class Observable<T> {
  private m_subscribeFn: (subscriber: Subscriber<T>) => void;
  public constructor(subscribe:  (subscriber: Subscriber<T>) => void){
    this.m_subscribeFn = subscribe;
  }
  public subscribe(s: Subscriber<T>): void{
    this.m_subscribeFn(s);
  }
  /* 今回追加実装した map オペレータ */
  public map<R>(fn: (value: T) => R): Observable<R> {
    return new Observable<R>((subscriber) => {
      this.m_subscribeFn({
        next: (r) => {
          if(subscriber.next) subscriber.next(fn(r));
        },
        error: (err) => {
          if(subscriber.error) subscriber.error(err);
        },
        complete: () => {
          if(subscriber.complete) subscriber.complete();
        }
      });
    });
  }
}

ちょっと、複雑かな?
でも、 return new Observable って何だか見覚えありませんか?
これ、非同期関数を observable にする時に書いたコードですよね。

非同期関数をobservableに変換する関数
function rxAsyncFn(a: number): Observable<number> {
  return new Observable<number>((subscriber) => {
    asyncFn(a, (r)=>{
      if(subscriber.next) subscriber.next(r);
      if(subscriber.complete) subscriber.complete();
    });
  });
}

それと同じで、 map オペレータ、つまり、 map() 関数はオブザーバブルを返却するのです。
つまり、 Rx のオペレータは全て observable を返却します。
Rx の世界は全てが observable なのです。

と、刷り込みはこのくらいにして。。。

この map() は関数 fn を引数に取ります。この関数 fn は、 observable から発行された値を加工する関数です。

発行された値に100を加える関数をmap()に渡している
rxAsyncFn(1)
.map((x)=>{
  return x + 100;
})

では、改めて observablemap() を見てみると

  /* 今回追加実装した map オペレータ */
  public map<R>(fn: (value: T) => R): Observable<R> {
    return new Observable<R>((subscriber) => {
      this.subscribe({
        next: (r) => {
          if(subscriber.next) subscriber.next(fn(r));
        },
        error: (err) => {
          if(subscriber.error) subscriber.error(err);
        },
        complete: () => {
          if(subscriber.complete) subscriber.complete();
        }
      });
    });
  }

observable 自身を subscribe しているのが分かりますか?
そうなんです。
この map() は自分自身を観測して、その観測して、観測された値を fn で変換して、その値を発行するんです。

何だか、バケツリレーやっている感じで面白くないですか?

で、この「自分自身を観測」というのは、全てのオペレータで共通します。
そして、観測したら「何をする」かが違うのです。

実装した map を動かしてみる

type Subscriber<T> = {
  next?: (value: T) => void;
  error?: (error: Error) => void;
  complete?: () => void;
}

class Observable<T> {
  private m_subscribeFn: (subscriber: Subscriber<T>) => void;
  public constructor(subscribe:  (subscriber: Subscriber<T>) => void){
    this.m_subscribeFn = subscribe;
  }
  public subscribe(s: Subscriber<T>): void{
    this.m_subscribeFn(s);
  }
  /* 今回追加実装した map オペレータ */
  public map<R>(fn: (value: T) => R): Observable<R> {
    return new Observable<R>((subscriber) => {
      this.subscribe({
        next: (r) => {
          if(subscriber.next) subscriber.next(fn(r));
        },
        error: (err) => {
          if(subscriber.error) subscriber.error(err);
        },
        complete: () => {
          if(subscriber.complete) subscriber.complete();
        }
      });
    });
  }
}

/* ベタな非同期関数 */
function asyncFn(a: number, callback: (r: number) => void){
  setTimeout(() => {
    callback(a * 2);
  }, 1000);
}

/* 非同期関数を observable に変換する関数 */
function rxAsyncFn(a: number): Observable<number> {
  return new Observable<number>((subscriber) => {
    asyncFn(a, (r)=>{
      if(subscriber.next) subscriber.next(r);
      if(subscriber.complete) subscriber.complete();
    });
  });
}

/* メイン処理 */
rxAsyncFn(1)
.map((x)=>{
  return x + 100;
})
.subscribe({
  next: (r) => {
    console.log(r);
  },
  complete: () => {
    console.log("complete");
  }
});
出力
102
complete

まとめ

本章では、一番基本の map オペレータについて深堀してみました。

Rx 初心者のコードを見ると、細かく subscribe をして処理をつなげる的なコードを書いているのを見かけたりするのですが、この辺りの仕組みが分かると、一連の処理は複数の observable とオペレータの連鎖で構成されて、1つの subscribe で完結することができる、と、いうかそうあるべきじゃないかと思う訳です。なぜなら、その subscribe して処理をつなげるというのは、 Rx が提供している数多のオペレータで実現できるからです。 1

さて、次は mergeMap です。
rxcpp では flat_map なのですが、数年前、「自分は千代田区で一番 flat_map をキーボードタイプしているんじゃないか?」 2 と思うくらい頻出 3 のオペレータです。

  1. 特殊な処理で subscribe を使って新しいオペレータ的なものを作ることはあります

  2. 単なる自意識過剰だな

  3. 実際のプロジェクトで flat_map を検索したら 1200個ほどありました。( map は400個弱)

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?