Cycle.js の勉強を始めて、RxJS の基本練習の量をこなすことが必要になり、いろいろ試したことを記録に残すことにしました。最小限どんなオペレーターを学べばよいのかは RxJS の開発者のあいだでも課題として考えられており、Staltz さんは RxJS の軽量バージョンの xstream を公開しています。xstream の紹介記事はこちらをご参照ください。
セットアップ
babel-node
ターミナルで ES6 (ES2015) のスクリプトを試すには babel-node
が便利です。利用するには babel-cli
をインストールします。
npm install -g babel-cli
トランスパイルのためにはプレセットを導入する必要があります。@
の後ろは Node.js のバージョンです。
npm install --save-dev babel-preset-es2015-node@6
スクリプトの実行は次のとおりです。
babel-node client.js --presets es2015-node
プレセットは package.json
で指定することができます。
"babel": {
"presets": ["es2015-node"]
}
RxJS
5.0 系の場合、ES6 (ES2015) モジュールと CommonJS の両方のパッケージが用意されています。
ES6 (ES2015) のパッケージを導入する場合、次のコマンドを実行します。
npm install rxjs-es
モジュールをインポートするには次のように書きます。
import Rx from 'rxjs/Rx';
Rx.Observable.of(1,2,3);
CommonJS のパッケージを導入する場合、次のコマンドを実行します。
npm install rxjs
モジュールの読み込みは次のとおりです。
let Rx = require('rxjs/Rx');
Rx.Observable.of(1,2,3);
4.0 系の場合、次のとおりです。
npm install rx
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.min.js"></script>
RxJS-DOM
npm install rx-dom
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs-dom/7.0.3/rx.dom.min.js"></script>
RxJS-jQuery
npm install rx-jquery
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs-jquery/1.1.6/rx.jquery.min.js"></script>
Fetch API
Can I use のサイトで Fetch API に対応するブラウザーの一覧 を調べることができます。古いブラウザーのために、Github が polyfill を公開しています。
定義されるメソッドの一覧
RxJS は こちら、RxJS-DOM はこちら、RxJS-jQuery はこちらを参照。
生成
配列、シーケンス
Rx.Observable.from
を使って配列から Observable を生成できます。
var Rx = require('rx');
var array = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
var source = Rx.Observable.from(array);
var subscription = source.subscribe(
function (x) { console.log('onNext: %s', x); },
function (e) { console.log('onError: %s', e); },
function () { console.log('onCompleted'); }
);
EcmaScript 6 (2015) であれば、Array.from
を使って配列を生成できます。
var array = Array.from({length: 10}, (v, k) => k + 1);
from
の第2引数で map
のコールバックに相当する関数を指定できます。
var source = Rx.Observable.from(array, function (x) { return x * x; });
シーケンスの Observable を直接生成することもできます。
var source = Rx.Observable.from({length: 10}, function(v, k) { return k + 1; });
要素の個数が少ないのであれば、Rx.Observable.of
で指定する選択肢がある。
var source = Rx.Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
プロミス
var promise = new Promise(function (resolve, reject) {
resolve(10);
});
var source = Rx.Observable.fromPromise(promise);
var subscription = source.subscribe(
function (x) { console.log('onNext: %s', x); },
function (e) { console.log('onError: %s', e); },
function () { console.log('onCompleted');
});
var promise2 = new Promise(function (resolve, reject) {
reject(new Error('reason'));
});
var source2 = Rx.Observable.fromPromise(promise2);
var subscription2 = source2.subscribe(
function (x) { console.log('onNext: %s', x); },
function (e) { console.log('onError: %s', e); },
function () { console.log('onCompleted'); }
);
ジェネレーター
function* range() {
var index = 0;
while(true) {
yield index++;
}
}
Rx.Observable.from(range())
.take(10)
.subscribe(function (x) {
console.log('値: %s', x);
});
Ajax
httpbin.org
で練習します。
GET
ES6 Promise に対応している Fetch API を使ってみましょう。Promise から Observable を生成するには fromPromise
を使います。
var promise = fetch('http://httpbin.org/get')
.then(function(response) {
return response.json();
}).then(function(json) {
return json.origin;
}).catch(function(ex) {
return ex;
});
Rx.Observable.fromPromise(promise)
.subscribe(
function (value) {
console.log(value);
},
function (err) {
console.log('エラー: %s', err);
},
function () {
console.log('完了');
});
Rx.Observable.spawn
を使ってジェネレーターによる非同期処理をやってみよう。
var Rx = require('rx');
var request = require('request');
var get = Rx.Observable.fromNodeCallback(request);
Rx.Observable.spawn(function* () {
var data;
try {
data = yield get('http://httpbin.org/get').timeout(5000 /*ms*/);
} catch (e) {
console.log('Error %s', e);
}
console.log(JSON.parse(data[0].body).origin);
}).subscribe();
POST
データの保存に FormData
を使ってみよう。Content-Type
の値は multipart/form-data
になります。
var data = new FormData();
data.append('input', 'bar');
var promise = fetch('http://httpbin.org/post', {
method: 'post',
body: data
}).then(function(response) {
return response.json()
})
.then(function(json) {
return json.form;
})
.catch(function(ex) {
return ex;
});
Rx.Observable.fromPromise(promise)
.subscribe(
function (value) {
console.log(value);
},
function (err) {
console.log('エラー: %s', err);
},
function () {
console.log('完了');
});
練習問題
FizzBuzz
FizzBuzz から練習をはじめよう。1 から 100 までの整数で構成されるシーケンスを生成するには Rx.Observable.range
を使う。
var Rx = require('rx');
var source = Rx.Observable.range(1, 100);
source.subscribe(function(value) {
if (value % 15 === 0) {
console.log('fizzbuzz');
} else if (value % 3 === 0) {
console.log('fizz');
} else if (value % 5 === 0) {
console.log('buzz');
} else {
console.log(value);
}
});
map を使って FizzBuzz の判定を複数の関数に分割してみよう。
var Rx = require('rx');
var source = Rx.Observable.range(1, 100);
source
.map(function(value) {
return value % 15 === 0 ? 'fizzbuzz' : value;
})
.map(function(value) {
return value % 3 === 0 ? 'fizz' : value;
})
.map(function(value) {
return value % 5 === 0 ? 'buzz' : value;
})
.subscribe(function(value) {
console.log(value);
});
複雑なシーケンスを生成できるように Rx.Observable.generate
が用意されている。
var Rx = require('rx');
var source = Rx.Observable.generate(
1,
function (value) { return value < 101; },
function (value) { return value + 1; },
function (value) {
if (value % 15 === 0) {
return 'FizzBuzz';
} else if (value % 3 === 0) {
return 'Fizz';
} else if (value % 5 === 0) {
return 'Buzz';
} else {
return value;
}
}
);
source.subscribe(function (x) {
console.log(x);
});
今度は配列を繰り返す方式に取り組んでみよう。ES6 のジェネレーターで無限のシーケンスを生成し、take
で最初の100をとる。
var Rx = require('rx');
function *list()
{
var list = [
'', '', 'Fizz', '', 'Buzz',
'Fizz', '', '', 'Fizz', 'Buzz',
'', 'Fizz', '', '', 'FizzBuzz'
];
while (true) {
for (var e of list) {
yield e;
}
}
}
Rx.Observable.from(list())
.take(100)
.map(function(value, index) {
return value === '' ? index + 1: value;
})
.subscribe(function(value) {
console.log(value);
});
今度は Fizz と Buzz の生成を分割してみよう。それぞれ別のシーケンスを生成した後で Rx.Observable.withLatestFrom
を使って統合する。
var Rx = require('rx');
function *fizz()
{
var list = ['', '', 'Fizz'];
while (true) {
for (var e of list) {
yield e;
}
}
}
function *buzz()
{
var list = ['', '', '', '', 'Bizz'];
while (true) {
for (var e of list) {
yield e;
}
}
}
var source1 = Rx.Observable.from(fizz());
var source2 = Rx.Observable.from(buzz());
var source = source1.withLatestFrom(
source2,
function (a, b) { return a + b; }
);
source
.take(100)
.map(function(value, index) { return value === '' ? index + 1: value; })
.subscribe(function (x) {
console.log('%s', x);
});
現在時刻を表示する
1秒単位で現在時刻を表示してみましょう。
<div id="result"></div>
var result = document.getElementById('result');
var source = Rx.Observable
.interval(1000)
.timeInterval();
source.subscribe(
function (x) {
result.innerHTML = (new Date()).toLocaleString();
},
function (err) {
console.log('エラー: ' + err);
},
function () {
console.log('完了');
});
ユーザーの入力を表示する
テキストボックスの入力をそのまま表示させてみましょう。Cycle.js のトップページにもコードサンプルが掲載されています。
<input id="textInput" type="text">
<div id="result"></div>
var textInput = document.querySelector('#textInput');
var result = document.querySelector('#result');
Rx.Observable.fromEvent(textInput, 'input')
.pluck('target', 'value')
.startWith('')
.subscribe(function(value) {
result.innerHTML = value;
});
pluck の代わりに map
を使うこともできます。
.map(function(ev) { return ev.target.value; })
クリックカウンター
ボタンをクリックするとカウンターの値が1つ増えるものとします。
<button id="plus">増やす</button>
<div id="result">0</div>
var plus = document.getElementById('plus');
var result = document.getElementById('result');
var source = Rx.Observable.fromEvent(plus, 'click')
.map(function() { return +1; })
.startWith(0)
.scan(function(acc, value) { return acc + value; })
.subscribe(function(value) { result.innerHTML = value; });
次にクリックするとカウンターの値が1減るボタンおよびカウンターの値が0になるリセットボタンを追加してみましょう。
<button id="plus">増やす</button>
<button id="minus">減らす</button>
<button id="reset">リセット</button>
<div id="counter"></div>
var counter = document.getElementById('counter');
var plus = Rx.Observable
.fromEvent(document.getElementById('plus'), 'click').map(function() { return +1; });
var minus = Rx.Observable
.fromEvent(document.getElementById('minus'), 'click').map(function() { return -1; });
var reset = Rx.Observable
.fromEvent(document.getElementById('reset'), 'click').map(function() { return 0; });
Rx.Observable.merge(plus, minus, reset)
.startWith(0)
.scan(function(acc, value) { return value === 0 ? 0 : acc + value; })
.subscribe(function(value) { counter.innerHTML = value; });
オペレーターのトップ10
Staltz 氏はトップ10のオペレーターを挙げています。
- flatMap
- merge
- scan
- combineLatest
- withLatestFrom
- map
- filter
- just
- share
- shareReplay
just
単一の値を含むシーケンスを返します。
var source = Rx.Observable.just(42);
source.subscribe(function (x) {
console.log('Next: %s', x);
});
map
var source = Rx.Observable.range(1, 3)
.map(function (value, index, observable) {
return value * value;
});
source.subscribe(function (value) {
console.log(value);
});
filter
var source = Rx.Observable.range(1, 5)
.filter(function (value, index, observable) {
return value % 2 === 0;
});
source.subscribe(function (value) {
console.log(value);
});
scan
Obsevable のシーケンスにアキュムレーター関数を適用して、それぞれの中間的な値を返します。
var source = Rx.Observable.range(1, 10)
.scan(function (acc, value, index, source) { return acc + value; }, 0);
source.subscribe(function (value) {
console.log(value);
});
merge
複数の Observable を1つの Observable にマージします。
var source = Rx.Observable.of(1, 3, 5);
var source2 = Rx.Observable.of(2, 4, 6);
var source3 = source.merge(source2);
source3.subscribe(function (value) {
console.log(value);
});
combineLatest
var source1 = Rx.Observable.interval(100)
.map(function (i) { return 'First: ' + i; });
var source2 = Rx.Observable.interval(200)
.map(function (i) { return 'Second: ' + i; });
var source = Rx.Observable.combineLatest(
source1,
source2
).take(4);
source.subscribe(function (value) {
console.log(value);
});
withLatestFrom
var source1 = Rx.Observable.interval(100)
.map(function (i) { return 'First: ' + i; });
var source2 = Rx.Observable.interval(200)
.map(function (i) { return 'Second: ' + i; });
var source = source1.withLatestFrom(
source2,
function (s1, s2) { return s1 + ', ' + s2; }
).take(5);
source.subscribe(function (value) {
console.log(value);
});
flatMap
var source = Rx.Observable.range(1, 5)
.flatMap(function (x, i) {
return Promise.resolve(x + i);
});
source.subscribe(function (value) {
console.log(value);
});
share
var source = Rx.Observable.range(1, 5);
var published = source.share();
function createObserver(tag) {
return Rx.Observer.create(
function (x) {
console.log('Next: ' + tag + x);
});
}
source.subscribe(createObserver('SourceA'));
source.subscribe(createObserver('SourceB'));
published.subscribe(createObserver('SourceC'));
published.subscribe(createObserver('SourceD'));
その他
just (return) と subscribe (forEach)
単一の値を返すには just、戻り値を伴わずに引数を展開したい場合、subscribe を使います。
scan と reduce の使いわけ
集約値の途中経過が必要な場合、scan、不要な場合、reduce とのことです。
zip を使う機会はほとんどない
Staltz 氏のつぶやきによると、zip
を使う機会はほとんどなく combineLatest
もしくは withLatestFrom
もしくは flatMap
を使うことを検討したほうがよいとのことです。
JWT 認証
Authenticable Observables: Build a Reactive App with JWT Auth は Fetch API を使うチュートリアルです。
hot と cold
Creating and Subscribing to Simple Observable Sequences にアナロジーによる説明があります。
- Cold Observable: ムービー
- Hot Observable: ライブパフォーマンス
- リプレイされる Hot Observable: ビデオに記録されたライブパフォーマンス
Subject・イベントバス を避ける
Erik Meijer によると Subject は Rx の世界のミュータブル名変数であり、たいていの場合、必要ないとのことです。Bacon.js のチュートリアルでイベントバスを避けるべきである理由がくわしく書かれています。