僕もJavascript触り出したの最近で非同期処理という言葉を認識して勉強し出したのはこれがほぼ初めてなので勉強がてら書いていきます。間違っていることがあれば指摘してくれると嬉しいです。
原典はここです。https://rxjs-dev.firebaseapp.com/guide/overview
そもそも非同期処理って?
同期的な処理とは上から順に実行される処理です。反対に非同期処理とはクリックなどのイベントが起こるまで実行されない処理です。document.addEventListner("click",()=>{console.log("クリックされたよ!")})}
などが非同期処理の典型例です。
RxJSを使うと何が嬉しい?
非同期処理の流れがわかりやすく書ける。RxJSを用いないとコールバック地獄と呼ばれるIf文が複雑に絡み合った大変読みづらいプログラムになってしまう。
イントロダクションを行う上でのローカル環境構築
クリックとかのブラウザに関するイベントのために一応説明しておく。ただし、1回しか使わないので飛ばしてもらっても問題ないです。他はVSCodeのデバッガーで動きます。
あとあとAngularで使いたいと思って勉強してるのでAngular上で実行していく。(Angularのインストールは各々してください。)
もちろん、Angularを用いずに環境構築してくれても全く問題ない。
ターミナル上でng new rxjs-practice
でプロジェクトを作成し、ng g c rxjs
で作成したコンポーネントの中で実行するのが手軽だろう。以下がその例である。
<app-rxjs></app-rxjs>
import { Component, OnInit } from '@angular/core';
import { fromEvent } from 'rxjs';
import { throttleTime, map, scan } from 'rxjs/operators';
@Component({
selector: 'app-rxjs',
templateUrl: './rxjs.component.html',
styleUrls: ['./rxjs.component.css']
})
export class RxjsComponent implements OnInit {
constructor() { }
ngOnInit(): void {
fromEvent(document, 'click')
.pipe(
throttleTime(1000),
scan(count => count + 1, 0)
)
.subscribe(count => console.log(`Clicked ${count} times`));
}
}
これだけ書いてng serve --open
と実行すればangularを触ったことがない人でもRxJSをローカルで動かすことができるようになるだろう。
全く同じ動作をする公式イントロダクションのなかのコードはこちらである。
import { fromEvent } from 'rxjs';
import { throttleTime, map, scan } from 'rxjs/operators';
fromEvent(document, 'click')
.pipe(
throttleTime(1000),
map(event => event.clientX),
scan((count, clientX) => count + clientX, 0)
)
.subscribe(count => console.log(count));
ngOnInit
のなかにコードを書けばコンポーネント生成時にその中のコードが実行される。今回はRxJSをローカルで動かすことを目的としているのでこれ以上はAngularについては触れない。サンプルコードを適宜import
とngOnInit
に振り分けてください。
PullとPush
大まかに言えば
- Pull : 呼び出し元が呼び出すタイミングを決める。呼び出された側はいつ呼び出されるかわからない。(例:Generator)
- Push : 呼び出された側がいつ呼び出されるか決める。呼びだし元はいつ呼び出すかわからない。(例:Promiss)
RxJSはPushに属する。話をわかりやすくするために例を示す。
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe({
next(x) { console.log('got value ' + x); },
error(err) { console.error('something wrong occurred: ' + err); },
complete() { console.log('done'); }
});
console.log('just after subscribe');
出力は
just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done
となる。subscribe
によってobservable
が呼び出される(以下では購読されると言う)とただちに1,2,3
の値が購読者に渡され関数next(x)
の引数として実行される。しかし、その後、呼び出され側(以下ではObservableと呼ぶ)が1000msタイムアウトしているのでその間next(x)
は実行されず、そのあとのconsole.log('just after subscribe');
が実行される。1000ms後にnext(x)
のx
として4
が渡される。ここでObservable
は終了し購読者のcomplete() { console.log('done'); }
が実行される。これは値を作る側が処理の実行タイミングを決めているのPushなのである。
});
Observable
Observableの作成、購読
以下の例では一秒毎にcount
の値が1つ大きくなり購読者に渡される。observable
を作成する際はnew Observable(subscriber=>{//処理//subscriber.next()})
のようにすれば良い。
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
let count = 0
let id = setInterval(() => {
count++
subscriber.next(count)
}, 1000);
}
);
observable.subscribe(x => console.log(x));
購読する際は今までもみてきたように
observable.subscribe(x=>f(x));
とすればよい。
また、複数の購読間で状態は共有されない。以下の例を見るとわかりやすいだろう。(以下ではimport
は省略する。コードエディター上でエラーが出ると思うので適切に対処してほしい。)
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1)
setTimeout(()=>{subscriber.next(2);},2000)
}
);
observable.subscribe(x=>{console.log(`A: ${x}`)})
observable.subscribe(x=>{console.log(`B: ${x}`)})
このコードの出力は以下である。
A:1
B:1
A:2
B:2
これを見るとたしかに購読が共有されていないことがわかる。observable.subscribe
はnew Observable(function subscribe(subscriber) {...})
の中で定義されたsubscriber
を呼び出すだけにすぎない。また、これはObservable execution
とも呼ばれる。一度の購読につき一度だけ実行されるが、これが渡す通知は以下の3種類である。
- "Next"通知: 何かしらの値を渡す。
- "Error"通知 : エラーもしくは例外を渡す。
- "Complete"通知 : 何も渡さない。
購読の終わらせ方
一度、始めた購読は何かしらの方法で終了させねばずっと継続されるが、購読はそれぞれ独立である。ただし、購読に変数名をつけることで購読を好きなタイミングで終了させることができる。const sub = observable.subscribe(x=>console.log("obserbver: ",x))
のように変数名をつけた後でsubscribe.unsubscribe()
とすれば良い。observable
の定義の中にあるreturn
の中にunsubscribe
時に実行したい処理を書くことができる。書かなければ購読が中止されるだけであり、以下の例ではこれをかかないと裏でInterval
が動き続ける。
const observable = new Observable(subscriber => {
let count = 0
const id = setInterval(() => {
count++
subscriber.next(count)
console.log("timer: ",count)
}, 1000)
return () => {
console.log("unsubscribe")
setTimeout(()=>{clearInterval(id)},2000)
}
})
const sub = observable.subscribe(x=>console.log("obserbver: ",x))
setTimeout(() => sub.unsubscribe(), 2000)
出力は
obserbver: 1
timer: 1
unsubscribe
timer: 2
timer: 3
となる。この出力を見てわかるようにunsubscribe
されたからと言って内部の処理がすぐに止まるわけではないので、内部でAddEventListenr
などを使っている時はremoveEveentLister
をreturn
のあとに書くことを忘れないようにしてください。
Observer
observer
とは単に以下の形をした連想配列である。(いくつかの要素が欠けていても問題ないが、欠けた要素から通知が来ても反応しない。例えば、絶対に失敗しないことがわかっているのならerror
の処理を書かなくても良い。)
const observer = {
next?: x => f(x),//成功した時の処理
error?: err => g(err),//エラーや例外が発生した時の処理
complete?: () => h(),//subscribeが終了した時の処理
};
observer
を使うには以下のように
observable.subscribe(observer);
subscribe
の中にいれればよい。
Operator
Operator
は以下の2種に分類できる
- Creation Operators
: 配列などを元に新しいObservable
を作る。
- Pipeable Operators
: Observable
を元に新しいObservable
を作る。
Creation Operators
とりあえず、of
とfrom
だけ説明しておく
使い方は見たまんまである。
import {of,from} from 'rxjs';
let ofObservable = of(1,2,3)
let fromObservable = from([1,2,3])
Pipeable Oparators
Observable
から送られてきた値を加工してに新たなObservableを作る。元のObservableは変化していない。
使い方は以下のようにする。
of(1,2,3).pipe(map(x=>x*x)).subscribe(x=>console.log(x))
コンソールは
1
4
9
である。他にも色々種類があるが、多すぎるので適宜紹介していきたい。
また、Pipeable Operator
はつなげることができる。たとえば、以下のように用いる
of(1,2,3).pipe(map(x=>x*x),filter(x=>x>3).subscribe(x=>console.log(x))
コンソールは
4
9
である。
Pipeable Operatorの作り方
せっかくなのでオペレータ(pipeable operator)の作り方も説明する。基本的には自作する必要はないだろうが、作り方を知っておくことでオペレータに対する理解が深まるはずだ。オペレータとは何かというとObservableを受け取ってObservableを返す関数を返す関数だ。
見本としてmap
オペレータを自作してみる。実際のものも似たような実装になってるんじゃないかなあ(憶測)
import { Observable ,of} from 'rxjs'
function newMap<T,U>(f:(x:T)=>U){
return (observable:Observable<T>) => new Observable<U>(observer=>{
const subscription = observable.subscribe({
next(value){
const out = f(value)
observer.next(out)
},
error(err){observer.error(err)},
complete(){observer.complete}
})
return ()=>{
subscription.unsubscribe()
console.log("unsubscribe")
}
})
}
let a = newMap((x:number)=>(x*x))(of(1,2,3)).subscribe((x)=>{console.log(x)})
a.unsubscribe()
返ってくる関数を見てみるとobservable
を引数に取り、戻り値となるObservable
は購読されるとobservable
の購読を開始し渡された値value
を関数f(value)
に代入し、その値をObservable
の値としてその購読者へと渡す。最後のreturn
の中にはunsubscribe
メソッドが呼び出された時の処理を書く。今回はobservable
への購読を終了し、尚且つコンソールにunsubscribe
と出力されるようにした。
出力は
1
4
9
unsubscribe
となり、実際に動くことがわかる。
Subscription
先ほども見たが
let a = observable.sebscribe(observable)
のa
のことである。このようにすることでa.unsubscribe()
で購読をやめることができる。
しかし、他にも使い道がある。たとえば、一つの購読をやめたとき他の購読も一緒にやめたいなどの処理をするときだ。それには以下のようにadd
を用いる。
import { interval } from 'rxjs';
const observable1 = interval(400);
const observable2 = interval(300);
const subscription = observable1.subscribe(x => console.log('first: ' + x));
const childSubscription = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
setTimeout(() => {
// Unsubscribes BOTH subscription and childSubscription
subscription.unsubscribe();
}, 1000);
コンソールは以下の通りである。
second: 0
first: 0
second: 1
first: 1
second: 2
上のコードではsubscription.unsubscription()
をした時に同時にchildsubscription
の購読も終了する。add
があるといことはremove
もある。働きは想像通りである。
Subject
Subject
とは複数の購読間で共有できる特殊なObservable
である。
観測者の立場からするとただObservable
とSubject
のどちらか区別できない。内部実装の話をするとSubject
にたいする購読は値を届ける新しい実行を始めない。通常のAddListner
などのように購読のリストを保持するだけである。
また、Subject
では購読側からnext
メソッドを使って新しく値を渡すことができる。
const subject = new Subject<number>();
let a = subject
a.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
let b = subject
b.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
a.next(1);
a.next(2);
この操作コードの出力はどうなるであろうか?答えは
observerA:1
observerB:1
observerA:2
observerB:2
となる。一つの購読からObservable
に値を渡すと全体の購読間で値が共有されるのだ。これはSPAでコンポーネント間でイベントやデータを共有したい時に用いることができる。
refCount()
あたりの話は最初はイントロとしては盛り込みすぎかなと思うので省きます。
ただ、connect()
とrefCount()
あたりのコードをTypeScriptで実行すると、型エラーが発生します。これは何故かと言うとconnect()
メソッドを使うためにはConnectableSubject
型である必要があり、multicast(subject)
を行うとこの型に変換されるはずですが、pipe
の中でオペレータを使用しても適切に型推定がされないから起こります。なので、ここでは型アサーションを行うか、pipe
を使わずにmulticast(subject)(source)
と書く必要があります。
BehaviorSubject
最新の値を持つことができる便利なSubject
です。
値を取得するには以下のように.value
で読み出してください。
const subject = new BehaviorSubject(0);
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1)
let b = subject.value
console.log("value:",b)
subject.next(2)
出力は
observerA: 0
observerA: 1
value:1
observerA: 2
ReplaySubject
複数の値を保持しておき、新たな購読が始まった際にいままでの値を全て新しい購読に渡します。
どれだけの数の値を保持するか、何ミリ秒前の値まで保持するかということも指定することができます。
使い方は下のコードを見ながら解説していきます。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 1000);
出力
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...
最初のconst subject = new ReplaySubject(100, 500 /* windowTime */);
で値を100個までかつ500ms前の値までを保持するとしていします。
その後、200msに一度値がObservableに渡されるのでObserverAは通常通り値を受け取ります。しかし、1000msに到達した時ObserverBの購読が始まります。ここからがただのSubject
と挙動が異なります。500ms前までのSubject
の値を持っているので購読が始まった時点でObserverBは3,4,5
の値を渡されて実行します。
Async Subject
イントロとしてはそこまで重要度高くなさそうなのでここでは省きます。
Schedular
処理の時間とか優先順位とかを決められる。
必要になったら調べるくらいで良さそう。
Testing
テストを実行するためには以下のパッケージをインストールする必要があります。テスト初めてだよって人はやっといてください。
npm install chai mocha ts-node @types/chai @types/mocha
あと、import { expect } from 'chai'
も毎回追加するのが無難です。
そんでpackage.json
のscript
をデフォルトのものからこれに置き換えてください。
"scripts": {
"test": "mocha --require ts-node/register --watch-extensions ts \"test/*.ts\""
}
こうしておくことでワーキングディレクトリ直下のtest
ディレクトリーにある.ts
ファイルをnpm test
コマンドでテストできるようになります。
さらに唐突に登場するhot
とcold
。。。こちらの記事がわかりやすかったので参考にしてください。
テストについては基本的なことだけ書きます。覚えなきゃいけないことが多いので使いながら覚えていくくらいの姿勢で行こうと思います。
一番簡単ななコードを紹介してから説明していきます。
import { TestScheduler } from 'rxjs/testing';
import{it}from "mocha"
import{expect} from"chai"
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).deep.equal(expected);
});
it('generate the stream correctly', () => {
testScheduler.run(({ hot,cold, expectObservable ,expectSubscriptions}) => {
const x = cold('--a--b--|');
const xexpect ='--a--b--|';
const xsubs = '^-------!';
expectObservable(x).toBe(xexpect);
expectSubscriptions(x.subscriptions).toBe(xsubs);
});
}
)
最初のconst
が始まったあたりからだけ意味を覚えれば良さそうに感じてるのでそこから説明を始めます。
-
は1フレームを表します。時間のようなものだと思ってください。cold('--a--a--|')
は最初の2フレーム何も吐かず3フレーム目でa
を吐き次に5フレーム目でb
を吐き8フレーム目でcomplete
します。(|
はcomplete
を表す。)
expectObservable(x).toBe(xexpect)
でx
とxexpect
の値が等しいかテストできます。
次にsubscriptionの長さを調べます。^
は購読の開始、!
は購読の終了を表します。なのでexpectSubscriptions(x.subscriptions).toBe(xsubs)
はx
が購読されてから終了するまでの長さがxsub
と等しいか調べることができます。
感想
RxJS便利そう。基本的な使い方に限れば覚えないといけないことそんなになさそうだし早速使ってみたい。
テストも大事といろんなところで聞くのでもう少し勉強してから取り入れてみたい。