LoginSignup
0
0

[RxJS] RxJS の学習メモ-Operator(3)-Transformation

Last updated at Posted at 2023-11-23

はじめに

本記事は RxJS を使っていく上で行った学習の備忘録になります。主に次に挙げた内容を目的とします。

  • 知らないことによる忌避感をなくす
    • RxJS を使った実装は個人的に初見殺しもいいとこな実装だと思っている
    • 知ることで「あ、別に怖がることないじゃん」という感じに持っていきたい
  • 知見の向上
    • ライブラリを改めてみることでより良い実装の方法、テクニックを得る
  • 思い込みや間違った理解の是正
    • 正しい、最適だと思っていたものが実は間違っていたことも充分あり得るので、その辺が是正できれば御の字

これまでと今回

これまでに次の記事を投稿してまいりました。

今回も Operators についての続きです。

環境

本記事について扱うライブラリや環境の情報です。

備考
RxJS 公式
Learn RxJS リファレンス的な感じの学習サイト
StackBlitz RxJS だけでなく Angular とか React とかの実装お試しができる

なお本記事執筆時の RxJS のバージョンは StackBlitz の DEPENDENCIES を見ると v7.8.0 でした。

この記事でやること

Operators について触れてみる(3)

前回の記事は こちら です。
ご興味あればご覧ください。

今回の記事では次のオペレータについて触れていきます。

本記事で扱うオペレータ

こちらもすべてを扱うのは量的な面で難しいので、個人的によく使うメソッドに対する理解を深めたいと思います。

Transformation Operators

map

map の特徴

  • ストリームで流れてきたデータに対して同期的に処理を行う
  • そして処理を行った結果の値を返し、それが subscribe に流れていく
  • イテレータで言うところの map をイメージするとわかりやすい
import {
  of,
  BehaviorSubject,
} from 'rxjs';

import {
  map,
} from 'rxjs/operators';

// ストリームを受信する受け皿
const receiver$ = new BehaviorSubject<string>('初期値');

// 配信するストリーム
const streamData$ = of('streamData');

receiver$.subscribe((receiver) => {
  console.log(`receiver=${receiver}`);
});

// 文字列を返す例をみる
streamData$
.pipe(
  map((streamData: string) =>  {
    console.log(`map に入ってきたときの値-1回目=${streamData}`);
    streamData = `${streamData} に文字列を加える.`
    return streamData;
  })
)
.subscribe((streamData: string) => {
  // receiver$ には `streamData に文字列を加える.` が流れていく
  receiver$.next(streamData);
});

// 配列を返す例をみる
streamData$
.pipe(
  map((streamData: string) =>  {
    console.log(`map に入ってきたときの値-2回目=${streamData}`);
    return [
      '文字列ではなく',
      '配列や Object を返すこともできる.'
    ];
  })
)
.subscribe((streamData: string[]) => {
  // 配列を連結して receiver$ に流す
  // receiver$ には `文字列ではなく 配列や Object を返すこともできる.` が流れていく
  receiver$.next(streamData.join(' '));
});

この処理を実行すると下記が出力されます。

receiver=初期値
map に入ってきたときの値-1回目=streamData
receiver=streamData に文字列を加える.
map に入ってきたときの値-2回目=streamData
receiver=文字列ではなく 配列や Object を返すこともできる.

出力された内容から receriver$next した際に流れたストリームが同期的に処理されていること、また mapstring でも 配列 でも Object でも返せることが確認できました。

mergeMap

mergeMap の特徴

  • ストリームで流れてきたデータに対して同期的に処理を行う。ここは map と変わらない
    • map との違いは 新しくストリームを生成するか否か
    • map は流れてきた値を加工するが、新たにストリームを生成することはない
  • mergeMap は流れてきたデータをもとに新たなストリームを生成する。(これは後述の switchMapconcatMap も同じ)
  • 新しく生成したストリームは、それを対象に map で処理しても良いし、そのまま subscribe に流すことも出来る
  • mergeMap は流れてきた ストリームの同期処理が片付いた順 に次の処理にデータが流れる
import { BehaviorSubject, Observable, of } from 'rxjs';
import { delay, mergeMap } from 'rxjs/operators';

interface StreamData {
  sentence: string;
  delay: number;
}

// 初期データなので一番最初に出力される
const receiver$ = new BehaviorSubject<StreamData>({
  sentence: '最初に流すストリーム',
  delay: 0,
});

receiver$
  .pipe(
    mergeMap((streamData: StreamData, index: number) => {
      return delayProcess$(index, streamData.sentence, streamData.delay)
    }),
  )
  .subscribe({
    next: (value: string): void => {
      console.log(value);
    }
  }
);

// receiver$ に対してストリームを流す
// receiver$ 内の mergeMap では新たなストリームを生成している
//
// で、 mergeMap は処理が終わった順に流れるので、このケースは delay の小さな順に処理が実行される
receiver$.next({ sentence: '最後に流れるストリーム', delay: 500 }); // 初期データが流れたあと、最後に出力される
receiver$.next({ sentence: '3番目に流れるストリーム', delay: 300 }); // 初期データが流れたあと、3番めに出力される
receiver$.next({ sentence: '2番目に流れるストリーム', delay: 100 }); // 初期データが流れたあと、2番めに出力される

function delayProcess$(index: number, sentence: string, delayTime: number): Observable<string> {
  return of(`[${index}] resultData: ${sentence} -> resolved`).pipe(
    // delay は指定した時間( msec )遅延してくれる
    delay(delayTime)
  )
}

この処理を実行すると下記が出力されます。

[0] resultData: 最初に流すストリーム -> resolved
[3] resultData: 2番目に流れるストリーム -> resolved
[2] resultData: 3番目に流れるストリーム -> resolved
[1] resultData: 最後に流れるストリーム -> resolved

[] 内の数値は実行順序です。
実行順序と出力結果から、 next で流れたストリームは指定した遅延時間にそって処理されていることがわかります。
つまり

  • mergeMap は流れてきた ストリームの同期処理が片付いた順 に次の処理にデータが流れる

の動きをこのサンプルコードから確認できました。

switchMap

switchMap の特徴

  • 大まかな部分は mergeMap と一緒
  • switchMap最後に処理されたストリームだけを対象に処理 をして次の処理にデータが流れる
  • つまり 最後に流れてくるストリームの前に処理されていた内容はキャンセル される
import { BehaviorSubject, Observable, of } from 'rxjs';
import { delay, switchMap } from 'rxjs/operators';

interface StreamData {
  sentence: string;
  delay: number;
}

// 初期データなので一番最初に出力される
const receiver$ = new BehaviorSubject<StreamData>({
  sentence: '最初に流すストリーム',
  delay: 0,
});

receiver$
  .pipe(
    switchMap((streamData: StreamData, index: number) => {
      return delayProcess$(index, streamData.sentence, streamData.delay)
    }),
  )
  .subscribe({
    next: (value: string): void => {
      console.log(value);
    }
  }
);

// receiver$ に対してストリームを流す
// receiver$ 内の switchMap では新たなストリームを生成している
//
// で、 switchMap は最後に流れてきたストリームだけを処理するので、 3番目の next だけが出力される
receiver$.next({ sentence: 'このストリームは出力されない', delay: 500 }); // 出力されない
receiver$.next({ sentence: 'このストリームは出力されない', delay: 300 }); // 出力されない
receiver$.next({ sentence: 'このストリームだけが出力される', delay: 100 }); // 出力される

function delayProcess$(index: number, sentence: string, delayTime: number): Observable<string> {
  return of(`[${index}] resultData: ${sentence} -> resolved`).pipe(
    // delay は指定した時間( msec )遅延してくれる
    delay(delayTime)
  )
}

この処理を実行すると下記が出力されます。

[3] resultData: このストリームだけが出力される -> resolved

[] 内の数値は実行順序です。
実行順序と出力結果から、最後に実行された next のストリームだけが処理されていることがわかります。
というわけで

  • switchMap最後に処理されたストリームだけを対象に処理 をして次の処理にデータが流れる
  • つまり 最後に流れてくるストリームの前に処理されていた内容はキャンセル される

動きをこのサンプルコードから確認できました。

concatMap

concatMap の特徴

  • こちらも大まかな部分は mergeMap と一緒
  • concatMapストリームが流れてきた 順 に処理を行い次の処理にデータが流れる
import { BehaviorSubject, Observable, of } from 'rxjs';
import { delay, concatMap } from 'rxjs/operators';

interface StreamData {
  sentence: string;
  delay: number;
}

// 初期データなので一番最初に出力される
const receiver$ = new BehaviorSubject<StreamData>({
  sentence: '最初に流すストリーム',
  delay: 0,
});

receiver$
  .pipe(
    concatMap((streamData: StreamData, index: number) => {
      return delayProcess$(index, streamData.sentence, streamData.delay)
    }),
  )
  .subscribe({
    next: (value: string): void => {
      console.log(value);
    }
  }
);

// receiver$ に対してストリームを流す
// receiver$ 内の concatMap では新たなストリームを生成している
//
// で、 concatMap はストリームが流れた順に処理されるので、 delay の大小に関係なく上から出力される
receiver$.next({ sentence: '2番目に流れるストリーム', delay: 500 }); // 初期データが流れたあと、2番目に出力される
receiver$.next({ sentence: '3番目に流れるストリーム', delay: 300 }); // 初期データが流れたあと、3番目に出力される
receiver$.next({ sentence: '最後に流れるストリーム', delay: 100 }); // 初期データが流れたあと、最後に出力される

function delayProcess$(index: number, sentence: string, delayTime: number): Observable<string> {
  return of(`[${index}] resultData: ${sentence} -> resolved`).pipe(
    // delay は指定した時間( msec )遅延してくれる
    delay(delayTime)
  )
}

この処理を実行すると下記が出力されます。

[0] resultData: 最初に流すストリーム -> resolved
[1] resultData: 2番目に流れるストリーム -> resolved
[2] resultData: 3番目に流れるストリーム -> resolved
[3] resultData: 最後に流れるストリーム -> resolved

[] 内の数値は実行順序です。
実行順序と出力結果から、next の実行順にストリームが処理されていることがわかります。
というわけで

  • concatMapストリームが流れてきた 順 に処理を行い次の処理にデータが流れる

動きをこのサンプルコードから確認できました。

補足

コード中の delayUtility Operators のオペレータです。
当該オペレータの詳細についてはリンク先をご参照ください。

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0