38
40

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RxJSで始めるRx(Reactive Extensions)入門2

Last updated at Posted at 2015-10-04

前回(RxJSで始めるRx(Reactive Extensions)入門1 - Qiita)はRxJSの基本部分の説明をしたので、今回はすこし応用的なところに行きましょう。

stream生成

RxJSでは様々なオペレータを駆使してstreamを加工することで目的を達成するのですが、現存するライブラリのほとんどでは残念ながらRxのstreamは提供されていないので、ライブラリをstreamに変換するアダプターが必要となります。また自分でライブラリを書く場合にも必要になりますね。

Rx.Observable.create()

今日紹介する方法の一つ目はcreateメソッドです。このメソッドはストリームを生成します。詳しい使い方を見てみましょう。

Rx = require('rx');
fs = require('fs');

observable = Rx.Observable.create(function(obs) {
  fs.readFile("hoge.txt", function(err, text) {
    if (err) {
      obs.onError(err);
      return;
    }
    text.toString().split("\n").forEach(function(line) {
      obs.onNext(line);
    });
    obs.onCompleted();
  });
});

observable.filter(function(line){
  return line.substr(0, 2) != '# ';
}).subscribe(function(line){
  console.log(line);
});

このプログラムはhoge.txtのテキストを読み込んで、#とスペースで始まらない行のみを表示するものです。

Rx.Observable.createは引数に関数を取りますが、この関数の中でストリームを制御します。制御するための仕組みはこの関数が受け取る引数obsが持っています。

obs.onNext()でデータを送ります。今回のプログラムだとnodeのfs.readFileメソッドを実行し、コールバック内で読み込んだtextを改行で分解し、一行ずつonNextしています。すべて読み込んだら、次はobs.onCompleted()でストリームを終了させます。また、エラーがある場合はobs.onError()でエラーを送ることができます。

Rx.Subject

Rxの世界で一般的に使われるのがSubjectです(たぶん)。Subjectは上述のonNextなどのメソッドを使えると同時にsubscribeすることもできます。

Rx = require('rx');
fs = require('fs');

Hoge = function() {
  this.subject = new Rx.Subject();
  this.observable = this.subject.publish();
  this.observable.connect();
}

Hoge.prototype.read = function(filename) {
  self = this
  fs.readFile(filename, function(err, text) {
    if (err) {
      self.subject.onError(err);
      return;
    }
    text.toString().split("\n").forEach(function(line) {
      self.subject.onNext(line);
    });
    self.subject.onCompleted();
  });
}

var hoge = new Hoge();
hoge.observable.filter(function(line){
  return line.substr(0, 2) != '# ';
}).subscribe(function(line){
  console.log(line);
});
hoge.read("hoge.txt");

このコードではsubjectをいったん、publish()したものをobservableとしています。このやり方が正しいのかどうかは知らないのですが、インスタンスを利用する側で、onNext()ができたりするのはあまりよくないと思ってSubjectとしての機能をいったん封印しています。connect()は、publish()した後はconnect()しないとストリームが流れてくれないのでしています。Hot/Coldがどうたらとか?(間違ってたら突っ込みください)

Subjectの利点はcreateとは違って、例えばsubjectを保有するオブジェクトに対するメソッド呼び出しに応じて自由にストリームを操作できる点ですね。

今回はひとまずここまでにしようと思います。

38
40
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
38
40

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?