0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

RxJSの実装に関して

Last updated at Posted at 2022-05-19

はじめに

RxJSを用いた実装をやってみたが、その実装に関して備忘録を残す。
(コード自体はコードを参照ください)

"this.stream$ = this.stream.asObservable();"に関して

まず、SubjectはObservableでもありObserverでもあると言われる(以下、公式からの引用)。

Every Subject is an Observable.(すべてのサブジェクトはObservableである)
Every Subject is an Observer.(すべてのサブジェクトはObserverである)

そのため、SubjectはObservableが持つデータを流すための"next()"メソッド(subscribe()時に引数で渡ってくるObserverのnext, error, completeという3つのcallbackの内のnextを実行する)を持ち、Subjectに新しくデータを渡す(データを流す)事ができる(以下、公式からの引用)。具体的的には以下のような実装ができるという事。

To feed a new value to the Subject, just call next(theValue),(新しい値をSubjectに与えるには、next(theValue)を呼び出すだけです)

import { Subject } from 'rxjs';

const subject = new Subject();
const subscription = subject.subscribe((x) => console.log(x)); // Subjectから値の受信を開始(この時点では何も流れていないので…)

subject.next("hogehoge"); // Subjectのnextメソッドを実行しデータを流す(データのpushを実施)

ここで、1つ困るのはSubjectをそのまま公開(ソースコード上で参照できるように)してしまうと、どこからでもそのSubjectに対し"next()"を呼び出されてしまい、意図せずデータがSubjectに渡る(データが流れる)という事が起きてしまう。そこで、"asObservable()"メソッドでSubjectをObservableに変換するという事が行われる (以下、公式Using asObservableからの引用)。

Creates a new Observable with this Subject as the source. You can do this to create customize Observer-side logic of the Subject and conceal it from code that uses the Observable.(このSubjectをソースとして、新しいObservableを作成します。これによって、Subject の Observer 側ロジックをカスタマイズして作成し、Observable を使用するコードから隠蔽することができます。)

The asObservable operator can be used to transform a subject into an observable. This can be useful when you’d like to expose the data from the subject, but at the same time prevent having data inadvertently pushed into the subject:(asObservable演算子は、サブジェクトをobservableに変換するために使用することができます。これは、サブジェクトからデータを公開したいが、同時にデータが不用意にサブジェクトに押し込まれるのを防ぎたい場合に便利です。)

※上記で隠蔽と言っているのは、Subjectそのものを公開せずObservableを公開するようにする事で、Subjectが持つnext()メソッドなどで意図せずデータが流されるという事を防げるという事。

前置きが長くなったが、今回の実装の中に出てくる以下のコードは上記の事を意識してわざと"asObservable()"メソッドでObservableを作成している(実際にはVueのdataに両方とも定義されているので、今回に限って言えば"this.stream"でどこからでも参照できてしまうが…。この辺りがデモなので少し雑に作っているといった部分)。

created() {
  this.stream = new Subject();
  this.stream$ = this.stream.asObservable();
},

・参考:Subject

"this.subscription.unsubscribe();"に関して Subscriptionの破棄(Observableの実行の破棄)

今回は、RxJSの複数のオペレーターの実際の動きを見ていくにあたり、Observableに対するpipe演算子を切り替える必要があった。その切り替えには以下のようにVueのメソッドでObservableに対するpipe演算子を切り替えできるような実装を考えた。

methods: {
  of() {
    return this.stream$;
  },
  map() {
    return this.stream$.pipe(map((x) => `map: ${x}`));
  },
...

ただ、一度Observableをsubscribe()すると、Subscriptionが作成され、subscribe()時に指定していたpipe演算子でしかデータが流れないという事になってしまう。そこで、以下のようにセレクトボックスの選択内容に応じて、pipe演算子の実装が切り替えできるように、Subscriptionの破棄をした上で、再度新しくObservableをsubscribe()する事でpipe演算子の実装を動的に切り替えられるようにした("this.subscription.unsubscribe();"は一度作成されたSubscriptionの破棄のために行っている)。


※上記で出てきたSubscriptionについて補足をすると、公式に書かれている通り、SubscriptionとはObservableの実行の事。

A Subscription is an object that represents a disposable resource, usually the execution of an Observable. (Subscriptionは使い捨てのリソースを表すオブジェクトで、通常はObservableの実行を表します)

ちょっと上記だと分かりにくいので、日常のサブスクリプションの概念をイメージしてみる。継続的に商品を購入している場面を想像して、それをRxJSの言葉と対比させる以下のようになる。

・継続的な商品購入のサービス=Observable
・継続的な商品購入のサービスを開始する・契約する=subscribe()メソッドの実行
・継続的な商品購入が実行されている状態=Subscription

つまり、Observable(継続的な商品購入のサービス)はデータを流す(日常の話であれば商品を届ける)が、それには流す開始の合図(日常の話であればサービス開始・契約)が必要になり、その開始の合図がsubcribe()メソッドの実行という事。そして開始によりObservableからデータが流れる(日常の話であれば商品が届く)が、このデータが流れている状態(Observableの実行でデータが流れている状態)がSubscription(日常の話であれば継続的購入をしている状態=サブスクリプション)。

・参考:RxJS Operators
・参考:Disposing Observable Executions

コード

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="UTF-8" />
    <meta http-equiv="X-UA-Compatible" content="IE=edge" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Understand RxJS</title>
    <link
      href="https://cdn.jsdelivr.net/npm/@mdi/font@6.x/css/materialdesignicons.min.css"
      rel="stylesheet"
    />
    <link
      href="https://cdn.jsdelivr.net/npm/vuetify@2.x/dist/vuetify.min.css"
      rel="stylesheet"
    />
    <link
      rel="stylesheet"
      href="https://cdn.jsdelivr.net/gh/highlightjs/cdn-release@11.5.0/build/styles/vs2015.min.css"
    />
    <style>
      html,
      body {
        height: 100%;
      }
      .fade-enter {
        left: -30px;
      }
      .slide-enter-active {
        animation: slide-in 2s reverse;
      }
      @keyframes slide-in {
        from {
          transform: translateX(100%);
        }
        to {
          transform: translateX(-30px);
        }
      }
      .stream-line {
        position: relative;
        display: block;
        min-height: 50px;
      }
      .stream-line:after {
        content: "";
        position: absolute;
        top: calc(50%);
        width: 100%;
        height: 1px;
        background-color: black;
      }
      .stream-item {
        position: absolute;
        list-style: none;
        top: calc(50% - 20px);
        left: 0;
        width: 100%;
      }
      .log-area {
        overflow: auto;
        height: 150px;
      }
      ul {
        list-style: none;
      }
      /* vuetifyのCSS上書き */
      .theme--light.v-application code {
        background-color: #1e1e1e;
        color: #dcdcdc;
      }
    </style>
  </head>
  <body>
    <v-app id="app">
      <v-main>
        <v-container>
          <h1>RxJSのオペレーターを視覚的に理解する</h1>

          <div class="stream-line my-5">
            <transition-group name="slide" tag="ul" @after-enter="afterEnter">
              <li v-for="(message, i) in messages" :key="i" class="stream-item">
                <v-icon
                  v-if="message.toString().startsWith('a')"
                  color="primary"
                  x-large
                  >mdi-circle
                </v-icon>
                <v-icon v-else x-large>mdi-circle</v-icon>
                <input type="hidden" :value="message" />
              </li>
            </transition-group>
          </div>

          <v-card class="mx-auto" max-width="344" outlined>
            <v-card-text class="log-area">
              <ul>
                <li v-for="(log, i) in logs" :key="i">{{ log }}</li>
              </ul>
            </v-card-text>
            <v-card-actions>
              <v-spacer></v-spacer>
              <v-btn @click="clear"> クリア </v-btn>
            </v-card-actions>
          </v-card>

          <v-select
            :value="selected"
            @input="select"
            :items="items"
            item-text="text"
            item-value="value"
          ></v-select>
          <v-text-field v-model="message" outlined clearable type="text">
            <template #append-outer>
              <v-btn color="primary" @click="push" :disabled="disabledBtn">
                送信
              </v-btn>
            </template>
          </v-text-field>

          <pre v-if="selected === 'of'" key="of">
            <code class="javascript">
            /** https://rxjs-dev.translate.goog/api/index/function/of */  
            import { of } from 'rxjs';
 
            // 単に受け取った値(データ)をストリームに流す
            const subscription = of(10, 20, 30).subscribe({
              next: (next) => console.log("next:", next),
              error: (err) => console.log("error:", err),
              complete: () => console.log("the end"),
            });

            /** Outputs */
            // next: 10
            // next: 20
            // next: 30
            // the end
            </code>
          </pre>
          <pre v-if="selected === 'interval'" key="interval">
            <code class="javascript">
              /** https://rxjs.dev/api/index/function/interval */  
              import { interval } from 'rxjs';
 
              // 指定時間ごとに連番でデータをストリームに流す
              const subscription = interval(1000).subscribe(
                text => console.log('Next: ', text)
              );
 
              /** Outputs */
              // Next: 0
              // Next: 1
              // Next: 2
              // ...
            </code>
          </pre>
          <pre v-if="selected === 'merge'" key="merge">
            <code class="javascript">
              /** https://rxjs.dev/api/index/function/merge */  
              import { merge, interval } from 'rxjs';
              import { take } from 'rxjs/operators';
   
              // 複数のObservableを1本にまとめる(1本のストリームに流す)
              const subscription = merge(
                interval(1200).pipe(
                  take(5),
                  map((index) => `a: ${index + 1} / 5`)
                ),
                interval(500).pipe(
                  take(10),
                  map((index) => `b: ${index + 1} / 10`)
                )
              ).subscribe(
                text => console.log(text)
              );
   
              /** Outputs */
              // 1: b: 1 / 10
              // 2: b: 2 / 10
              // 3: a: 1 / 5
              // 4: b: 3 / 10
              // ...
            </code>
          </pre>
          <pre v-if="selected === 'map'" key="map">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/map */  
              import { of } from 'rxjs';
              import { map } from 'rxjs/operators';
   
              // ストリームに流れるデータを整形する
              const subscription = of(1, 2, 3).pipe(
                map((text) => `maped!: ${textx}`)
              ).subscribe((text) => console.log(text));
   
              /** Outputs */
              // maped!: 1
              // maped!: 2
              // maped!: 3
            </code>
          </pre>
          <pre v-if="selected === 'switchMap'" key="switchMap">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/switchMap */  
              import { Observable } from 'rxjs';
              import { switchMap } from 'rxjs/operators';
   
              // ストリームAに流れるデータを元に、別のストリームBを流す
              // ※別のストリームBが流れ終わる前に、元のストリームAにデータが流れると、
              //   ストリームBは破棄され、最初からストリームBが流れる
              const subscription = Observable.pipe(
                switchMap((text) => {
                  return interval(1500).pipe(
                    take(3),
                    map((index) => `switchMaped: ${index} => ${text}`)
                  );
                })
              );
   
              /** Outputs */
              // switchMaped: 0 => hoge
              // switchMaped: 1 => hoge
              // switchMaped: 2 => hoge
            </code>
          </pre>
          <pre v-if="selected === 'filter'" key="filter">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/filter */  
              import { Observable } from 'rxjs';
              import { filter } from 'rxjs/operators';
   
              // 条件に合致するデータのみストリームに流す
              const subscription = Observable.pipe(
                filter((text) => text === "test")
              ).subscribe((text) => console.log(text));
   
              /** Outputs */
              // test
            </code>
          </pre>
          <pre v-if="selected === 'throttleTime'" key="throttleTime">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/throttleTime */  
              import { Observable } from 'rxjs';
              import { throttleTime } from 'rxjs/operators';
   
              // 指定時間だけ連続するデータを無視する(指定時間だけデータを間引きストリームにデータを流す)
              const subscription = Observable.pipe(throttleTime(1000)).subscribe(
                (text) => console.log(text)
              );
   
              /** Outputs */
              // test(1000ミリ秒経過ごとに出力される)
            </code>
          </pre>
          <pre v-if="selected === 'debounceTime'" key="debounceTime">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/debounceTime */  
              import { Observable } from 'rxjs';
              import { debounceTime } from 'rxjs/operators';
   
              // 連続するデータの最後のデータを、指定時間経過後に1度だけストリームに流す
              const subscription = Observable.pipe(debounceTime(250)).subscribe(
                (text) => console.log(text)
              );
   
              /** Outputs */
              // test(250ミリ秒経過後1回だけ出力される)
            </code>
          </pre>
          <pre
            v-if="selected === 'distinctUntilChanged'"
            key="distinctUntilChanged"
          >
            <code class="javascript">
              /** https://rxjs.dev/api/operators/distinctUntilChanged */  
              import { Observable } from 'rxjs';
              import { distinctUntilChanged } from 'rxjs/operators';
   
              // 前回と値が異なるデータのみストリームに流す
              const subscription = Observable.pipe(distinctUntilChanged()).subscribe(
                (text) => console.log(text)
              );
   
              /** Outputs */
              // hoge
              // foo
            </code>
          </pre>
          <pre v-if="selected === 'skip'" key="skip">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/skip */  
              import { Observable } from 'rxjs';
              import { skip } from 'rxjs/operators';
   
              // 指定回数のデータをストリームに流さずにスキップする
              const subscription = Observable.pipe(skip(3)).subscribe(
                (text) => console.log(text)
              );
   
              /** Outputs */
              // test(4回送信をクリックした後に出力される)
            </code>
          </pre>
          <pre v-if="selected === 'take'" key="take">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/take */  
              import { Observable } from 'rxjs';
              import { take } from 'rxjs/operators';
   
              // 指定回数のみデータをストリームに流す
              const subscription = Observable.pipe(take(3)).subscribe(
                (text) => console.log(text)
              );
   
              /** Outputs */
              // test
              // test
              // test(これ以降はログが出力されない)
            </code>
          </pre>
        </v-container>
      </v-main>
    </v-app>

    <script src="https://cdn.jsdelivr.net/npm/vue@2.x/dist/vue.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/vuetify@2.x/dist/vuetify.js"></script>
    <script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
    <script src="https://cdn.jsdelivr.net/gh/highlightjs/cdn-release@11.5.0/build/highlight.min.js"></script>
    <script>
      const { Subject, of, interval, merge } = rxjs;
      const {
        take,
        map,
        switchMap,
        filter,
        throttleTime,
        debounceTime,
        distinctUntilChanged,
        skip,
      } = rxjs.operators;

      new Vue({
        el: "#app",
        vuetify: new Vuetify(),
        data: () => ({
          selected: "nothing",
          items: [
            {
              text: "nothing(「送信」をクリックする事でストリームにデータが流れる様子をデモできます。一部disabledもあります。)",
              value: "nothing",
            },
            { text: "[Creation Operators] of()", value: "of" },
            { text: "[Creation Operators] interval()", value: "interval" },
            { text: "[Join Creation Operators] merge()", value: "merge" },
            { text: "[Transformation Operators] map()", value: "map" },
            {
              text: "[Transformation Operators] switchMap()",
              value: "switchMap",
            },
            {
              text: "[Filtering Operators] filter()",
              value: "filter",
            },
            {
              text: "[Filtering Operators] throttleTime()",
              value: "throttleTime",
            },
            {
              text: "[Filtering Operators] debounceTime()",
              value: "debounceTime",
            },
            {
              text: "[Filtering Operators] distinctUntilChanged()",
              value: "distinctUntilChanged",
            },
            { text: "[Filtering Operators] skip()", value: "skip" },
            { text: "[Filtering Operators] take()", value: "take" },
          ],
          stream: null,
          stream$: null,
          subscription: null,
          counter: 0,
          logs: [],
          message: null,
          messages: [],
        }),
        computed: {
          disabledBtn() {
            return (
              this.selected === "of" ||
              this.selected === "interval" ||
              this.selected === "merge"
            );
          },
        },
        created() {
          this.stream = new Subject();
          this.stream$ = this.stream.asObservable();
          this.subscription = this.stream$.subscribe((text) =>
            this.messages.push(text)
          );
        },
        methods: {
          select(value) {
            this.selected = value;
            setTimeout(() => {
              hljs.highlightAll();
            }, 50);

            this.subscription.unsubscribe();
            this.subscription = this[value]().subscribe((text) => {
              this.messages.push(text);
            });
          },
          push() {
            this.stream.next(this.message || "not input");
          },
          addLog(value) {
            this.logs.push(`${++this.counter}: ${value}`);
          },
          clear() {
            this.logs.splice(0, this.logs.length);
            this.messages.splice(0, this.messages.length);
            this.counter = 0;
          },
          afterEnter(el) {
            this.addLog(el.querySelector("input").value);
            el.style = "display: none;";
          },
          nothing() {
            return this.stream$;
          },
          // https://rxjs.dev/guide/operators#creation-operators-2
          of() {
            return of(10, 20, 30);
          },
          interval() {
            return interval(1000);
          },
          // https://rxjs.dev/guide/operators#join-creation-operators
          merge() {
            return merge(
              interval(1200).pipe(
                take(5),
                map((index) => `a: ${index + 1} / 5`)
              ),
              interval(500).pipe(
                take(10),
                map((index) => `b: ${index + 1} / 10`)
              )
            );
          },
          // https://rxjs.dev/guide/operators#transformation-operators
          map() {
            return this.stream$.pipe(map((text) => `maped!: ${text}`));
          },
          switchMap() {
            return this.stream$.pipe(
              switchMap((text) => {
                return interval(1500).pipe(
                  take(3),
                  map((index) => `switchMaped: ${index} => ${text}`)
                );
              })
            );
          },
          // https://rxjs.dev/guide/operators#filtering-operators
          filter() {
            return this.stream$.pipe(filter((text) => text === "test"));
          },
          throttleTime() {
            return this.stream$.pipe(throttleTime(1000));
          },
          debounceTime() {
            return this.stream$.pipe(debounceTime(250));
          },
          distinctUntilChanged() {
            return this.stream$.pipe(distinctUntilChanged());
          },
          skip() {
            return this.stream$.pipe(skip(3));
          },
          take() {
            return this.stream$.pipe(take(3));
          },
        },
      });
    </script>
  </body>
</html>
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?