2
1

More than 1 year has passed since last update.

RxJS公式Overview要約(備忘録)

Last updated at Posted at 2020-10-10

僕もJavascript触り出したの最近で非同期処理という言葉を認識して勉強し出したのはこれがほぼ初めてなので勉強がてら書いていきます。間違っていることがあれば指摘してくれると嬉しいです。
原典はここです。https://rxjs-dev.firebaseapp.com/guide/overview

そもそも非同期処理って?

同期的な処理とは上から順に実行される処理です。反対に非同期処理とはクリックなどのイベントが起こるまで実行されない処理です。document.addEventListner("click",()=>{console.log("クリックされたよ!")})}などが非同期処理の典型例です。

RxJSを使うと何が嬉しい?

非同期処理の流れがわかりやすく書ける。RxJSを用いないとコールバック地獄と呼ばれるIf文が複雑に絡み合った大変読みづらいプログラムになってしまう。

イントロダクションを行う上でのローカル環境構築

クリックとかのブラウザに関するイベントのために一応説明しておく。ただし、1回しか使わないので飛ばしてもらっても問題ないです。他はVSCodeのデバッガーで動きます。
あとあとAngularで使いたいと思って勉強してるのでAngular上で実行していく。(Angularのインストールは各々してください。)
もちろん、Angularを用いずに環境構築してくれても全く問題ない。
ターミナル上でng new rxjs-practiceでプロジェクトを作成し、ng g c rxjsで作成したコンポーネントの中で実行するのが手軽だろう。以下がその例である。

app.component.html
<app-rxjs></app-rxjs>
rxjs.component.ts
import { Component, OnInit } from '@angular/core';
import { fromEvent } from 'rxjs';
import { throttleTime, map, scan } from 'rxjs/operators';

@Component({
  selector: 'app-rxjs',
  templateUrl: './rxjs.component.html',
  styleUrls: ['./rxjs.component.css']
})
export class RxjsComponent implements OnInit {

  constructor() { }

  ngOnInit(): void {
    fromEvent(document, 'click')
    .pipe(
      throttleTime(1000),
      scan(count => count + 1, 0)
    )
    .subscribe(count => console.log(`Clicked ${count} times`));
  }

}

これだけ書いてng serve --openと実行すればangularを触ったことがない人でもRxJSをローカルで動かすことができるようになるだろう。

全く同じ動作をする公式イントロダクションのなかのコードはこちらである。

import { fromEvent } from 'rxjs';
import { throttleTime, map, scan } from 'rxjs/operators';

fromEvent(document, 'click')
  .pipe(
    throttleTime(1000),
    map(event => event.clientX),
    scan((count, clientX) => count + clientX, 0)
  )
  .subscribe(count => console.log(count));

ngOnInitのなかにコードを書けばコンポーネント生成時にその中のコードが実行される。今回はRxJSをローカルで動かすことを目的としているのでこれ以上はAngularについては触れない。サンプルコードを適宜importngOnInitに振り分けてください。

PullとPush

大まかに言えば
- Pull : 呼び出し元が呼び出すタイミングを決める。呼び出された側はいつ呼び出されるかわからない。(例:Generator)
- Push : 呼び出された側がいつ呼び出されるか決める。呼びだし元はいつ呼び出すかわからない。(例:Promiss)

RxJSはPushに属する。話をわかりやすくするために例を示す。

import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe({
  next(x) { console.log('got value ' + x); },
  error(err) { console.error('something wrong occurred: ' + err); },
  complete() { console.log('done'); }
});
console.log('just after subscribe');

出力は

console
just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

となる。subscribeによってobservableが呼び出される(以下では購読されると言う)とただちに1,2,3の値が購読者に渡され関数next(x)の引数として実行される。しかし、その後、呼び出され側(以下ではObservableと呼ぶ)が1000msタイムアウトしているのでその間next(x)は実行されず、そのあとのconsole.log('just after subscribe');が実行される。1000ms後にnext(x)xとして4が渡される。ここでObservableは終了し購読者のcomplete() { console.log('done'); }
});
が実行される。これは値を作る側が処理の実行タイミングを決めているのPushなのである。

Observable

Observableの作成、購読

以下の例では一秒毎にcountの値が1つ大きくなり購読者に渡される。observableを作成する際はnew Observable(subscriber=>{//処理//subscriber.next()})のようにすれば良い。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  let count = 0
  let id = setInterval(() => {
    count++
    subscriber.next(count)
    }, 1000);
  }
);
observable.subscribe(x => console.log(x));

購読する際は今までもみてきたように

observable.subscribe(x=>f(x));

とすればよい。

また、複数の購読間で状態は共有されない。以下の例を見るとわかりやすいだろう。(以下ではimportは省略する。コードエディター上でエラーが出ると思うので適切に対処してほしい。)

code
const observable = new Observable(function subscribe(subscriber) {
    subscriber.next(1)
    setTimeout(()=>{subscriber.next(2);},2000)
  }
);
observable.subscribe(x=>{console.log(`A: ${x}`)})
observable.subscribe(x=>{console.log(`B: ${x}`)})

このコードの出力は以下である。

A:1
B:1
A:2
B:2

これを見るとたしかに購読が共有されていないことがわかる。observable.subscribenew Observable(function subscribe(subscriber) {...})の中で定義されたsubscriberを呼び出すだけにすぎない。また、これはObservable executionとも呼ばれる。一度の購読につき一度だけ実行されるが、これが渡す通知は以下の3種類である。

  • "Next"通知: 何かしらの値を渡す。
  • "Error"通知 : エラーもしくは例外を渡す。
  • "Complete"通知 : 何も渡さない。

購読の終わらせ方

一度、始めた購読は何かしらの方法で終了させねばずっと継続されるが、購読はそれぞれ独立である。ただし、購読に変数名をつけることで購読を好きなタイミングで終了させることができる。const sub = observable.subscribe(x=>console.log("obserbver: ",x))のように変数名をつけた後でsubscribe.unsubscribe()とすれば良い。observableの定義の中にあるreturnの中にunsubscribe時に実行したい処理を書くことができる。書かなければ購読が中止されるだけであり、以下の例ではこれをかかないと裏でIntervalが動き続ける。

const observable = new Observable(subscriber => {
    let count = 0
    const id = setInterval(() => {
      count++
      subscriber.next(count)
      console.log("timer: ",count)
    }, 1000)
    return () => {
        console.log("unsubscribe")
        setTimeout(()=>{clearInterval(id)},2000)
    }
})
const sub = observable.subscribe(x=>console.log("obserbver: ",x))
setTimeout(() => sub.unsubscribe(), 2000)

出力は

obserbver:  1
timer:  1
unsubscribe
timer:  2
timer:  3

となる。この出力を見てわかるようにunsubscribeされたからと言って内部の処理がすぐに止まるわけではないので、内部でAddEventListenrなどを使っている時はremoveEveentListerreturnのあとに書くことを忘れないようにしてください。

Observer

observerとは単に以下の形をした連想配列である。(いくつかの要素が欠けていても問題ないが、欠けた要素から通知が来ても反応しない。例えば、絶対に失敗しないことがわかっているのならerrorの処理を書かなくても良い。)

const observer = {
  next?: x => f(x),//成功した時の処理
  error?: err => g(err),//エラーや例外が発生した時の処理
  complete?: () => h(),//subscribeが終了した時の処理
};

observerを使うには以下のように

observable.subscribe(observer);

subscribeの中にいれればよい。

Operator

Operatorは以下の2種に分類できる
- Creation Operators : 配列などを元に新しいObservableを作る。
- Pipeable Operators : Observableを元に新しいObservableを作る。

Creation Operators

とりあえず、offromだけ説明しておく
使い方は見たまんまである。

import {of,from} from 'rxjs';
let ofObservable = of(1,2,3)
let fromObservable = from([1,2,3])

Pipeable Oparators

Observableから送られてきた値を加工してに新たなObservableを作る。元のObservableは変化していない。
使い方は以下のようにする。

of(1,2,3).pipe(map(x=>x*x)).subscribe(x=>console.log(x))

コンソールは

1
4
9

である。他にも色々種類があるが、多すぎるので適宜紹介していきたい。
また、Pipeable Operatorはつなげることができる。たとえば、以下のように用いる

of(1,2,3).pipe(map(x=>x*x),filter(x=>x>3).subscribe(x=>console.log(x))

コンソールは

4
9

である。

Pipeable Operatorの作り方

せっかくなのでオペレータ(pipeable operator)の作り方も説明する。基本的には自作する必要はないだろうが、作り方を知っておくことでオペレータに対する理解が深まるはずだ。オペレータとは何かというとObservableを受け取ってObservableを返す関数を返す関数だ。

見本としてmapオペレータを自作してみる。実際のものも似たような実装になってるんじゃないかなあ(憶測)

import { Observable ,of} from 'rxjs'

function newMap<T,U>(f:(x:T)=>U){
    return (observable:Observable<T>) => new Observable<U>(observer=>{
        const subscription = observable.subscribe({
            next(value){
                const out = f(value)
                observer.next(out)
            },
            error(err){observer.error(err)},
            complete(){observer.complete}
        })
        return ()=>{
            subscription.unsubscribe()
            console.log("unsubscribe")
        }
    })
}
let a = newMap((x:number)=>(x*x))(of(1,2,3)).subscribe((x)=>{console.log(x)})
a.unsubscribe()

返ってくる関数を見てみるとobservableを引数に取り、戻り値となるObservableは購読されるとobservableの購読を開始し渡された値valueを関数f(value)に代入し、その値をObservableの値としてその購読者へと渡す。最後のreturnの中にはunsubscribeメソッドが呼び出された時の処理を書く。今回はobservableへの購読を終了し、尚且つコンソールにunsubscribeと出力されるようにした。
出力は

1
4
9
unsubscribe

となり、実際に動くことがわかる。

Subscription

先ほども見たが

let a = observable.sebscribe(observable)

aのことである。このようにすることでa.unsubscribe()で購読をやめることができる。
しかし、他にも使い道がある。たとえば、一つの購読をやめたとき他の購読も一緒にやめたいなどの処理をするときだ。それには以下のようにaddを用いる。

import { interval } from 'rxjs';

const observable1 = interval(400);
const observable2 = interval(300);

const subscription = observable1.subscribe(x => console.log('first: ' + x));
const childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
  // Unsubscribes BOTH subscription and childSubscription
  subscription.unsubscribe();
}, 1000);

コンソールは以下の通りである。

second: 0
first: 0
second: 1
first: 1
second: 2

上のコードではsubscription.unsubscription()をした時に同時にchildsubscriptionの購読も終了する。addがあるといことはremoveもある。働きは想像通りである。

Subject

Subjectとは複数の購読間で共有できる特殊なObservableである。
観測者の立場からするとただObservableSubjectのどちらか区別できない。内部実装の話をするとSubjectにたいする購読は値を届ける新しい実行を始めない。通常のAddListnerなどのように購読のリストを保持するだけである。
また、Subjectでは購読側からnextメソッドを使って新しく値を渡すことができる。

const subject = new Subject<number>();

let a = subject
a.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
let b = subject
b.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

a.next(1);
a.next(2);

この操作コードの出力はどうなるであろうか?答えは

observerA:1
observerB:1
observerA:2
observerB:2

となる。一つの購読からObservableに値を渡すと全体の購読間で値が共有されるのだ。これはSPAでコンポーネント間でイベントやデータを共有したい時に用いることができる。

refCount()あたりの話は最初はイントロとしては盛り込みすぎかなと思うので省きます。
ただ、connect()refCount()あたりのコードをTypeScriptで実行すると、型エラーが発生します。これは何故かと言うとconnect()メソッドを使うためにはConnectableSubject型である必要があり、multicast(subject)を行うとこの型に変換されるはずですが、pipeの中でオペレータを使用しても適切に型推定がされないから起こります。なので、ここでは型アサーションを行うか、pipeを使わずにmulticast(subject)(source)と書く必要があります。

BehaviorSubject

最新の値を持つことができる便利なSubjectです。
値を取得するには以下のように.valueで読み出してください。

    const subject = new BehaviorSubject(0);     
    subject.subscribe({
      next: (v) => console.log(`observerA: ${v}`)
    });
    subject.next(1)
    let b = subject.value
    console.log("value:",b)
    subject.next(2)

出力は

observerA: 0
observerA: 1
value:1
observerA: 2

ReplaySubject

複数の値を保持しておき、新たな購読が始まった際にいままでの値を全て新しい購読に渡します。
どれだけの数の値を保持するか、何ミリ秒前の値まで保持するかということも指定することができます。
使い方は下のコードを見ながら解説していきます。

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 1000);

出力

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

最初のconst subject = new ReplaySubject(100, 500 /* windowTime */);で値を100個までかつ500ms前の値までを保持するとしていします。
その後、200msに一度値がObservableに渡されるのでObserverAは通常通り値を受け取ります。しかし、1000msに到達した時ObserverBの購読が始まります。ここからがただのSubjectと挙動が異なります。500ms前までのSubjectの値を持っているので購読が始まった時点でObserverBは3,4,5の値を渡されて実行します。

Async Subject

イントロとしてはそこまで重要度高くなさそうなのでここでは省きます。

Schedular

処理の時間とか優先順位とかを決められる。
必要になったら調べるくらいで良さそう。

Testing

テストを実行するためには以下のパッケージをインストールする必要があります。テスト初めてだよって人はやっといてください。

npm install chai mocha ts-node @types/chai @types/mocha

あと、import { expect } from 'chai'も毎回追加するのが無難です。
そんでpackage.jsonscriptをデフォルトのものからこれに置き換えてください。

"scripts": {
    "test": "mocha --require ts-node/register --watch-extensions ts \"test/*.ts\""
  }

こうしておくことでワーキングディレクトリ直下のtestディレクトリーにある.tsファイルをnpm testコマンドでテストできるようになります。

さらに唐突に登場するhotcold。。。こちらの記事がわかりやすかったので参考にしてください。
テストについては基本的なことだけ書きます。覚えなきゃいけないことが多いので使いながら覚えていくくらいの姿勢で行こうと思います。
一番簡単ななコードを紹介してから説明していきます。

import { TestScheduler } from 'rxjs/testing';
import{it}from "mocha"
import{expect} from"chai"

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).deep.equal(expected);
});

it('generate the stream correctly', () => {
    testScheduler.run(({ hot,cold, expectObservable ,expectSubscriptions}) => {     
        const x = cold('--a--b--|');
        const xexpect ='--a--b--|';
        const xsubs =  '^-------!';
        expectObservable(x).toBe(xexpect);
        expectSubscriptions(x.subscriptions).toBe(xsubs);
        });
    }
)

最初のconstが始まったあたりからだけ意味を覚えれば良さそうに感じてるのでそこから説明を始めます。
-は1フレームを表します。時間のようなものだと思ってください。cold('--a--a--|')は最初の2フレーム何も吐かず3フレーム目でaを吐き次に5フレーム目でbを吐き8フレーム目でcompleteします。(|completeを表す。)
expectObservable(x).toBe(xexpect)xxexpectの値が等しいかテストできます。
次にsubscriptionの長さを調べます。^は購読の開始、!は購読の終了を表します。なのでexpectSubscriptions(x.subscriptions).toBe(xsubs)xが購読されてから終了するまでの長さがxsubと等しいか調べることができます。

感想

RxJS便利そう。基本的な使い方に限れば覚えないといけないことそんなになさそうだし早速使ってみたい。
テストも大事といろんなところで聞くのでもう少し勉強してから取り入れてみたい。

2
1
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1