0
1

More than 1 year has passed since last update.

【RxJS】Observable・Observer・Operators・Subscription

Last updated at Posted at 2022-07-10

業務でAngularを使い始め、RxJSが複雑だったため、主に使用しているオブジェクトを中心に整理したいと思います。

RxJSとは

Reactive Extensions for JavaScriptの略称で、JavaScriptでリアクティブプログラミングをするためのライブラリ。
RxJSを用いることで、JavaScriptの非同期処理やイベントに関する処理を簡単に書くことができる。

リアクティブプログラミングとは

時間の経過によって変化する値に対する操作を宣言的に記述するプログラミングパラダイム。

時間の経過によって変化する値を流す場所をストリームと呼ぶ。
このストリームにクリックイベントやHTTPデータなどの値が流れ、流れた値に対して加工や計算などの処理をしていく。

よくある例えとして、以下のようなもので説明される。

  • ストリーム = 川
  • ストリームを流れる値 = 魚
  • 処理 = 大きな魚だけ捕まえる

余談ですが、RxJSの動物は魚のような見た目をしていますね。

RxJSで主に使用するオブジェクト

Observable

Observableの前に前提となるデータの受け渡し方について説明する必要がある。

プルとプッシュ

データを作る側をプロデューサー、データを受け取る側をコンシューマーとした上で、データの受け渡し方には以下の2つがある。

  • プル
    コンシューマーをプロデューサーからデータを取り出す方法。
    データを受信するタイミングは受け取り側のコンシューマーが決定する。
    呼び出すことで1つのデータを受け取る関数、複数のデータを順番に取り出すことができるイテレーターがプルに該当する。

  • プッシュ
    プロデューサーがコンシューマーへデータを送信する方法。
    データを送信するタイミングは作る側のプロデューサーが決定する。
    1つのデータをコールバック関数であるコンシューマーに渡すPromise、そして複数のデータを渡すことができるのがObservable
    (なお、Observableは単数のデータも渡すことが出来る)

表にすると以下のように分類できる。

プロデューサー コンシューマー 実装(単数・複数)
プル 要求されたときにデータを作る データがいつ要求されるか決める 関数・イテレーター
プッシュ データを作りいつ送るか決める データを受け取ったら処理する Promise・Observable

上記から、Observableは単数・複数の値を好きなタイミングでコンシューマーにプッシュすることができるオブジェクトといえる。

4つの処理段階

Observableには以下4つの処理段階がある。

  • 作成
  • 購読
  • 実行
  • 破棄

作成

Observableは、コンストラクタに1つのsubscribe関数を渡して作成する。
以下の場合、1秒ごとにHello Worldをコンシューマーであるsubscriberに送信するデータを作成する。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  setInterval(() => {
    subscriber.next('Hello World')
  }, 1000);
});

購読

購読することでObservableのデータをどのように処理するかを定義する。
以下では送信されてきたデータをconsole.logで画面に表示する。

observable.subscribe(x => console.log(x));

subscribeすると、作成したObservablesubscriberに設定した値(ここではHello World)が渡されて実行される。

実行

Observableには以下3種類の実行タイプがある。

  • next
    数値・文字列・オブジェクトなどの値を送信する

  • error
    JavaScriptエラー・例外を送信する

  • complete
    値を送信しない

nextsubscriberに送信するデータを表し、errorcompleteは実行中にどちらか一方だけが実行される。
errorcomplete実行後にはデータは送信されない。

以下の場合、1,2,3までは表示されるが途中でcompelteを実行しているため、4は表示されない。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4);
});

observable.subscribe(x => console.log(x));

破棄

Observableは無限に値を送り続けることができるが、不要になったら無駄なメモリリソースの浪費を避けるために止める必要がある。
Observable.subscribe()を呼び出すとSubscripionオブジェクトを返されるため、実行をキャンセルするにはunsubscribe()を呼び出す。

以下の場合、1,2を表示した後に実行を取り消したため、setIntervablの処理は実行されない。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1)
  subscriber.next(2)
  setInterval(() => {
    subscriber.next('Hello World');
  }, 1000);
});

const a = observable.subscribe(x => console.log(x));
a.unsubscribe()

Observer

Observernexterrorcompleteの3つのコールバックを持ち、Observableからの値を受け取るオブジェクト。

const observer: Observer<number> = {
  next: (value) => console.log(`next:${value}`),
  error: (err) => console.log(err),
  complete: () => console.log("complete"),
}

コールバックは3つ揃っていなくても、Observableの実行は可能。(以下の場合、errorが無くても実行されないだけ)

const observer = {
	next: (value: number) => console.log(`next:${value}`),
	complete: () => console.log("complete"),
};

またsubscribeに関数を直接渡すと、Observerオブジェクトを内部的に作成し、nextのコールバックになる。

observable.subscribe(x => console.log(x));

Operators

Operatorsは複雑な非同期の処理を簡潔に処理することが出来るObservableのメソッド。
Operatorsには以下2種類の関数がある

Pipeable Operators

Pipeable Operatorsは入力としてObservableを受け取り、処理された新しいObservableを返す関数。
オペレーターをチェーンすることでパイプライン処理をすることができる。

  • ex) 渡されたObservableの値をfilterで偶数に絞り、mapで2乗する
import { of, filter, map } from 'rxjs';

of(1, 2, 3, 4, 5)
  .pipe(
    filter(x => x % 2 == 0),
    map(x => x * x))
  .subscribe((x) => console.log(x)); 
// 4, 16

Creation Operators

スタンドアロンで新しいObservableを作成するための関数。

  • ex) intervalで1秒ごとに0から連番する
import { interval } from 'rxjs';

const observable = interval(1000);
observable.subscribe(x => console.log(x));
// 0 1 2 3 4 5...

Subscription

SubscriptionObservableの実行中にリソースを破棄するオブジェクト。
上記で記載した通り、unsubscribe()Observableの実行をキャンセルする。

また、Subscription.add()で別のサブスクリプションを追加することで、サブスクリプションを複数持つことができる。
その後、unsubscribe()すると追加したサブスクリプションの実行もキャンセルできる。

import { interval } from 'rxjs';

const observable1 = interval(1000);
const observable2 = interval(2000);

let subscription1 = observable1.subscribe(x => console.log(`sub1:${x}`));
let subscription2 = observable2.subscribe(x => console.log(`sub2:${x}`));

subscription1.add(subscription2);

setInterval(() => {
  subscription1.unsubscribe()
}, 3000);

//sub1:0
//sub1:1
//sub2:0
//sub1:2

参考

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1