RxJSはObservableオブジェクトを軸にしたリアクティブプログラミングのJavaScriptライブラリです。非同期の処理やイベントにもとづくコードが簡単に組み立てられます。本稿では、RxJSのObservableでweb APIのデータを読み込んでみます。なお、RxJSについて、詳しくは「RxJS 6入門」01〜07をお読みください。
01 RxJSライブラリをCDNで読み込む
RxJSを手っ取り早く試すには、CDNから読み込むのが手軽でしょう。HTMLドキュメントの<head>要素に、<script>要素をつぎのように加えます。本稿執筆時の最新リリースバージョンはRxJS 6.2.2です。
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>
npmなどを用いたそのほかのインストールの仕方についてはGitHubのRxJS 6「Installation and Usage」または「Installation」をご覧ください。ただし、後者のCDNはバージョン5.0.0のままリンクが切れているようです。
02 簡単な非同期の処理にObservableを使ってみる
まずは、Observableを使わない非同期の処理です。つぎの関数(async())に、setTimeout()メソッドで時間待ちの処理を定めました。関数の引数に渡されたミリ秒ののちに、console.log()メソッドで文字列が出力されます。
function async(arg) {
	console.log('start');
	setTimeout(() =>
		console.log(`${Math.floor(arg / 1000)} second delay is up`)
	, arg);
}
async(1000);
// コンソール出力
start
1 second delay is up
RxJSの処理はObservableオブジェクトをつくることから始まります。そのためのメソッドはObservable.create()です。渡す関数には、データをどう処理して送るのか定めます。データを送るメソッドが、関数の引数(Observerオブジェクト)に対して呼び出すObserver.next()です。
Observableの処理を実行するには、Observable.subscribe()メソッドを呼び出さなければなりません。引数に渡す関数が、Observer.next()により送られてくるデータにどう処理を加えるのか定めます。
const {Observable} = rxjs;
function async(arg) {
    return Observable.create((observer) => {
        console.log('start');
        setTimeout(() => {
            observer.next(arg);
        }, arg);
    });
}
async(1000)
.subscribe(
    (arg) => console.log(`${Math.floor(arg / 1000)} second delay is up`)
);
// コンソール出力
start
1 second delay is up
03 XMLHttpRequestとObservableでweb APIのデータを読み込む
つぎは、webで公開されているAPIをXMLHttpRequestオブジェクトで読み込んでみます。APIとして用いるのは、「Open Notify」の「How Many People Are In Space Right Now」です。NASAのデータにもとづいて、今宇宙でどれだけの人たちが働いているのかわかります。XMLHttpRequestクラスの使い方について、詳しくは「XMLHttpRequest の利用」をお読みください。
XMLHttpRequestオブジェクトでデータを読み込み終えたときの処理は、loadイベントのリスナーとして定めます。XMLHttpRequest.responseプロパティでJSONデータが得られますので、調べたいプロパティ値を取り出せばよいでしょう。
const url = 'http://api.open-notify.org/astros.json';
function getSpacePeople(url) {
	return Rx.Observable.create((observer) => {
		const request = new XMLHttpRequest();
		request.addEventListener('load', (event) =>
			observer.next(request.response)
		);
		request.open('GET', url);
		request.send();
	});
}
getSpacePeople(url)
.subscribe(
	(data) => console.log(JSON.parse(data).people)
);
04 イベント待ちのオブジェクトをObservableにする
fromEvent()関数は、イベントを待つオブジェクトのObservableをつくります。引数は、オブジェクトとイベントのふたつです。前掲コードのXMLHttpRequestオブジェクトがloadイベントを待つ処理はつぎのように書き替えられます。
const {Observable, fromEvent} = rxjs;
const url = 'http://api.open-notify.org/astros.json';
function getSpacePeople(url) {
    return Observable.create((observer) => {
        const request = new XMLHttpRequest();
        /* request.addEventListener('load', (event) =>
            observer.next(request.response)
        ); */
        fromEvent(request, 'load')
        .subscribe((event) => observer.next(request.response));
        request.open('GET', url);
        request.send();
    });
}
getSpacePeople(url)
.subscribe(
    (data) => console.log(JSON.parse(data).people)
);
05 処理の失敗を送る
Observableの処理の失敗は、Observer. error()で送ります。受け取ったエラーは、Observable.subscribe()メソッドの第2引数に渡す関数が扱います。前掲コードにエラーの処理を加えたのがつぎのコードです。実際の動きを確かめてコードが試せるように、以下のサンプル001をjsdo.itに掲げました。
const {Observable, fromEvent} = rxjs;
const url = 'http://api.open-notify.org/astros.json';
function getSpacePeople(url) {
    return Observable.create((observer) => {
        const request = new XMLHttpRequest();
        fromEvent(request, 'load')
        .subscribe((event) => observer.next(request.response));
        fromEvent(request, 'error')
        .subscribe((event) => observer.error(request));
        request.open('GET', url);
        request.send();
    });
}
getSpacePeople(url)
.subscribe(
    (data) => console.log(JSON.parse(data).people),
    (request) => console.error('エラー:', request.status)
);

