前回(RxJSで始めるRx(Reactive Extensions)入門1 - Qiita)はRxJSの基本部分の説明をしたので、今回はすこし応用的なところに行きましょう。
stream生成
RxJSでは様々なオペレータを駆使してstreamを加工することで目的を達成するのですが、現存するライブラリのほとんどでは残念ながらRxのstreamは提供されていないので、ライブラリをstreamに変換するアダプターが必要となります。また自分でライブラリを書く場合にも必要になりますね。
Rx.Observable.create()
今日紹介する方法の一つ目はcreate
メソッドです。このメソッドはストリームを生成します。詳しい使い方を見てみましょう。
Rx = require('rx');
fs = require('fs');
observable = Rx.Observable.create(function(obs) {
fs.readFile("hoge.txt", function(err, text) {
if (err) {
obs.onError(err);
return;
}
text.toString().split("\n").forEach(function(line) {
obs.onNext(line);
});
obs.onCompleted();
});
});
observable.filter(function(line){
return line.substr(0, 2) != '# ';
}).subscribe(function(line){
console.log(line);
});
このプログラムはhoge.txtのテキストを読み込んで、#とスペースで始まらない行のみを表示するものです。
Rx.Observable.createは引数に関数を取りますが、この関数の中でストリームを制御します。制御するための仕組みはこの関数が受け取る引数obs
が持っています。
obs.onNext()
でデータを送ります。今回のプログラムだとnodeのfs.readFile
メソッドを実行し、コールバック内で読み込んだtextを改行で分解し、一行ずつonNext
しています。すべて読み込んだら、次はobs.onCompleted()
でストリームを終了させます。また、エラーがある場合はobs.onError()
でエラーを送ることができます。
Rx.Subject
Rxの世界で一般的に使われるのがSubject
です(たぶん)。Subject
は上述のonNext
などのメソッドを使えると同時にsubscribe
することもできます。
Rx = require('rx');
fs = require('fs');
Hoge = function() {
this.subject = new Rx.Subject();
this.observable = this.subject.publish();
this.observable.connect();
}
Hoge.prototype.read = function(filename) {
self = this
fs.readFile(filename, function(err, text) {
if (err) {
self.subject.onError(err);
return;
}
text.toString().split("\n").forEach(function(line) {
self.subject.onNext(line);
});
self.subject.onCompleted();
});
}
var hoge = new Hoge();
hoge.observable.filter(function(line){
return line.substr(0, 2) != '# ';
}).subscribe(function(line){
console.log(line);
});
hoge.read("hoge.txt");
このコードではsubject
をいったん、publish()
したものをobservable
としています。このやり方が正しいのかどうかは知らないのですが、インスタンスを利用する側で、onNext()
ができたりするのはあまりよくないと思ってSubject
としての機能をいったん封印しています。connect()
は、publish()
した後はconnect()
しないとストリームが流れてくれないのでしています。Hot/Coldがどうたらとか?(間違ってたら突っ込みください)
Subject
の利点はcreate
とは違って、例えばsubject
を保有するオブジェクトに対するメソッド呼び出しに応じて自由にストリームを操作できる点ですね。
今回はひとまずここまでにしようと思います。