はいさい!ちゅらデータぬオースティンやいびーん!
概要
最近、Angularで遊んでいてServiceの中でObservable
というインスタンスを始めて使いました。
これがなんなのかさっぱりわからず、ネットを明後日調べました。
すると、これが最高に面白い技術であることを知ったのです。
このObservable
は現在、ECMAScriptのステージ1提案になっており、数年前からネイティブのJavaScript機能として組み込んでいきたいという話が出ています。
本記事ではObservable
がなんなのか簡単に説明します。
Observable
とは何か
Observable
は、Promise
だと考えれば早いです。
じゃあPromiseと何が違うの?
Promise
は、resolve
で一つの結果しか返せないのに対して、Observable
は無限のresolve
を実行することができます。
以下の例でPromise
とObservable
で同じ結果が得られます。
Promise
const promise = new Promise((resolve) => {
resolve("hello");
})
promise.then((result) => console.log(result)); // hello
Observable
Observableはrxjs
からインポートします。
※ちなみに、RxJSは、あのマイクロソフトが作ったライブラリらしいです
import { Observable } from "rxjs";
const observable$ = new Observable<string>((observer) => {
observer.next("Hello");
});
observable$.subscribe((message) =>
console.log("Message from observable: ", message) // Message from observable: Hello
);
この二つの例を見ると、非常に似ていることがわかりますが、どこが違うのかというと、observer.next
を無限に呼べることです。
import { Observable } from "rxjs";
const observable$ = new Observable<string>((observer) => {
setInterval(() => observer.next(`Hello from ${new Date()}`), 5000);
});
observable$.subscribe((message) =>
console.log("Message from observable: ", message)
);
以下のような内容がコンソールに出力されます。
さらに、Observableを停止することができます。
import { Observable } from "rxjs";
const observable$ = new Observable<string>((observer) => {
setInterval(() => observer.next(`Hello from ${new Date()}`), 5000);
setTimeout(() => observer.complete(), 10000);
});
observable$.subscribe((message) =>
console.log("Message from observable: ", message)
);
しかも、Observableを停止した時に、Oberservableの中で動いているコードを止めるためのコールバック関数も設定できるのです。
import { Observable } from "rxjs";
const observable$ = new Observable<string>((observer) => {
const interval = setInterval(
() => observer.next(`Hello from ${new Date()}`),
5000
);
setTimeout(() => observer.complete(), 20000);
return () => {
clearInterval(interval);
console.log("Observable completed.");
};
});
observable$.subscribe((message) =>
console.log("Message from observable: ", message)
);
こうして、Promise
にない機能がいくつかあって、使いこなせば便利なのです。
ちなみに、$
を変数名の最後につけているのは、「これはObservable
なのだ」と伝える目的の慣例です。
Observableは、ジェネレーター関数の反対のようなもの
ジェネレータは以下のような使い方をします。
function* infinite() {
let index = 0;
while (true) {
yield index++;
}
}
const generator = infinite(); // "Generator { }"
console.log(generator.next().value); // 0
console.log(generator.next().value); // 1
console.log(generator.next().value); // 2
ゲネレータでは、generator.next()
を実行することで、箱から取り出すことができますが、Observable
では、observer.next()
で ものを箱に入れて「取り出せ」という命令まで出す のです。
Observableのまとめ
Observable
はPromise
と非常に似ているが、一つの解決ではなく、複数回解決することができる
これは理解しました。
しかしながら、読者には次のような疑問が泡のように膨れ上がってくるのではないでしょうか?
でも具体的にどういう場面でいいのこれ?
そこなのですが、Observable
はどのような問題を解決してくれるのか?という問いに回答した方がいいと思います。
Observableがどのような問題を解決してくれるのか
Observable
は、データが部分的かつ継続的に流入してくるような状況でそのデータを処理する問題を解決してくれます。
具体例でいいましょう。
EventListenerの例
Observable
はEventListener
を付けるのに便利です。
import { Observable } from "rxjs";
const button = document.querySelector("button")!;
const buttonClickObserver$ = new Observable<Event>((observer) => {
const clickEventListener: EventListener = (event) => observer.next(event);
button.addEventListener("click", clickEventListener);
return () => button.removeEventListener("click", clickEventListener);
});
buttonClickObserver$.subscribe((event) => console.log(event));
Observable
のコールバック関数を使って、buttonClickObserver$.unsubscribe
を実行した時にEventListener
を取り外すことができます。
上記と同じようなことが速記法で以下のように書けます。
import { fromEvent } from "rxjs";
const button = document.querySelector("button")!;
const buttonClickObserver$ = fromEvent(button, "click");
buttonClickObserver$.subscribe((event) => console.log(event));
これだけだとObservable
のメリットが伝わらないかと思いますが、この他に、buttonClickObserver$
が配信するEvent
を配列と同様に扱うことができます。
つまり、buttonClickObserver$
に対してfilter
、map
、およびreduce
のような配列特有の関数が使えます。
これは、KeyboardEvent
などにショートカットキーが押されたかどうかを判定したい時に非常に便利です。
import { fromEvent, filter } from "rxjs";
const input = document.querySelector("input")!;
const inputKeydownObserver$ = fromEvent<KeyboardEvent>(input, "keydown");
const shortcutListener$ = inputKeydownObserver$.pipe(
filter(
(event) =>
!event.isComposing && event.metaKey && event.key.toUpperCase() === "B"
)
);
shortcutListener$.subscribe((event) => {
event.preventDefault();
console.log("Bold shortcut!");
});
filter
を使って、KeyboardEvent
のキーがB
のキーかどうか、metaKey
が押されているかどうか、そしてIME入力の途中かどうかを確認して、各条件を満たしているEvent
のみsubscribe
に流すのです。
動画ストリーミングの例
動画ストリーミングがあるのではないでしょうか。
最近だと、複数の画質と音質のストリームを用意して、ユーザー側の通信スピード・帯域幅に合わせてどの画質と音質を使うのかを変えます。
これも、動画を再生しながらしますが、動画を小さな一部(チャンク)に分けてMediaSource
に足していきます。
そして、もうチャンクがないという時にサーバーから取得するのをやめます。
これがまさにObservable
が最高に適切な問題です。
以下のようにObservable
を使って解決しましょう。
const videoChunks$ = new Observable<{ chunkNo: number; buffer: ArrayBuffer }>(
(observer) => {
let chunkNo = 0;
const recursivelyFetchChunks = async () => {
try {
while (true) {
const result = await fetch(`/videos/my-video_part_${chunkNo}`);
if (!result.ok) throw Error(`Status error: ${result.status}`);
const buffer = await result.arrayBuffer();
observer.next({ chunkNo, buffer });
chunkNo++;
}
} catch (error) {
observer.error(error);
observer.complete(); // 404エラーが返ってきたら終わったのでObservableを終了させる
}
};
recursivelyFetchChunks();
}
);
const mediaSource = new MediaSource();
mediaSource.addEventListener(
"sourceopen",
() => {
const sourceBuffer = mediaSource.addSourceBuffer(
'video/mp4; codecs="avc1.42E01E, mp4a.40.2"'
);
videoChunks$.subscribe(({ chunkNo, buffer }) => {
console.log("Chunk received: ", chunkNo);
sourceBuffer.appendBuffer(buffer);
});
},
{ once: true }
);
const videoElement = document.querySelector("video")!;
const mediaSourceObjectURL = URL.createObjectURL(mediaSource);
videoElement.src = mediaSourceObjectURL;
※ 汚い例で筆者をお許しください。
上記の例は、最後のチャンクが何番なのかわからない前提で撮り続けるものです。
上記のような例で見られるように、非同期処理で継続的にデータを取得し続ける場合にとても役立ちます。
まとめ
少ししか説明できませんでしたが、要するに以下の点を押さえればよいかと思います。
- ObservableはPromiseに似ている
- 複数回解決して情報を流せる
- 一つの
Observable
に対して複数の受信者(Subscriber)をつけることができる - Observerableのコールバック関数で継続中の処理を停止することができる
筆者もObserverable
についてより一層勉強していきたいので、わかったことをちょこちょこ記事にできたらと思います。