Help us understand the problem. What is going on with this article?

RxJS の練習

More than 3 years have passed since last update.

Cycle.js の勉強を始めて、RxJS の基本練習の量をこなすことが必要になり、いろいろ試したことを記録に残すことにしました。最小限どんなオペレーターを学べばよいのかは RxJS の開発者のあいだでも課題として考えられており、Staltz さんは RxJS の軽量バージョンの xstream を公開しています。xstream の紹介記事はこちらをご参照ください。

セットアップ

babel-node

ターミナルで ES6 (ES2015) のスクリプトを試すには babel-node が便利です。利用するには babel-cli をインストールします。

npm install -g babel-cli

トランスパイルのためにはプレセットを導入する必要があります。@ の後ろは Node.js のバージョンです。

npm install --save-dev babel-preset-es2015-node@6

スクリプトの実行は次のとおりです。

babel-node client.js --presets es2015-node

プレセットは package.json で指定することができます。

package.json
"babel": {
  "presets": ["es2015-node"]
}

RxJS

5.0 系の場合、ES6 (ES2015) モジュールと CommonJS の両方のパッケージが用意されています。

ES6 (ES2015) のパッケージを導入する場合、次のコマンドを実行します。

npm install rxjs-es

モジュールをインポートするには次のように書きます。

import Rx from 'rxjs/Rx';

Rx.Observable.of(1,2,3);

CommonJS のパッケージを導入する場合、次のコマンドを実行します。

npm install rxjs

モジュールの読み込みは次のとおりです。

let Rx = require('rxjs/Rx');

Rx.Observable.of(1,2,3);

4.0 系の場合、次のとおりです。

npm install rx
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.min.js"></script>

RxJS-DOM

npm install rx-dom
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs-dom/7.0.3/rx.dom.min.js"></script>

RxJS-jQuery

npm install rx-jquery
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs-jquery/1.1.6/rx.jquery.min.js"></script>

Fetch API

Can I use のサイトで Fetch API に対応するブラウザーの一覧 を調べることができます。古いブラウザーのために、Github が polyfill を公開しています。

定義されるメソッドの一覧

RxJS は こちら、RxJS-DOM はこちら、RxJS-jQuery はこちらを参照。

生成

配列、シーケンス

Rx.Observable.from を使って配列から Observable を生成できます。

var Rx = require('rx');

var array = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
var source = Rx.Observable.from(array);

var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); }
);

EcmaScript 6 (2015) であれば、Array.from を使って配列を生成できます。

var array = Array.from({length: 10}, (v, k) => k + 1);

from の第2引数で map のコールバックに相当する関数を指定できます。

var source = Rx.Observable.from(array, function (x) { return x * x; });

シーケンスの Observable を直接生成することもできます。

var source = Rx.Observable.from({length: 10}, function(v, k) { return k + 1; });

要素の個数が少ないのであれば、Rx.Observable.of で指定する選択肢がある。

var source = Rx.Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

プロミス

var promise = new Promise(function (resolve, reject) {
    resolve(10);
});

var source = Rx.Observable.fromPromise(promise);

var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted');
});

var promise2 = new Promise(function (resolve, reject) {
    reject(new Error('reason'));
});

var source2 = Rx.Observable.fromPromise(promise2);

var subscription2 = source2.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); }
);

ジェネレーター

function* range() {
  var index = 0;
  while(true) {
    yield index++;
  }
}

Rx.Observable.from(range())
  .take(10)
  .subscribe(function (x) {
    console.log('値: %s', x);
});

Ajax

httpbin.org で練習します。

GET

ES6 Promise に対応している Fetch API を使ってみましょう。Promise から Observable を生成するには fromPromise を使います。

var promise = fetch('http://httpbin.org/get')
  .then(function(response) {
    return response.json();
  }).then(function(json) {
    return json.origin;
  }).catch(function(ex) {
    return ex;
  });

Rx.Observable.fromPromise(promise)
  .subscribe(
  function (value) {
    console.log(value);
  },
  function (err) {
    console.log('エラー: %s', err);
  },
  function () {
    console.log('完了');
  });

Rx.Observable.spawn を使ってジェネレーターによる非同期処理をやってみよう。

var Rx = require('rx');
var request = require('request');

var get = Rx.Observable.fromNodeCallback(request);

Rx.Observable.spawn(function* () {
  var data;

  try {
    data = yield get('http://httpbin.org/get').timeout(5000 /*ms*/);
  } catch (e) {
    console.log('Error %s', e);
  } 

  console.log(JSON.parse(data[0].body).origin);
}).subscribe();

POST

データの保存に FormData を使ってみよう。Content-Type の値は multipart/form-data になります。

var data = new FormData();
data.append('input', 'bar');

var promise = fetch('http://httpbin.org/post', {
  method: 'post',
  body: data
}).then(function(response) {
    return response.json()
  })
  .then(function(json) {
    return json.form;
  })
  .catch(function(ex) {
    return ex;
});

Rx.Observable.fromPromise(promise)
  .subscribe(
  function (value) {
    console.log(value);
  },
  function (err) {
    console.log('エラー: %s', err);
  },
  function () {
    console.log('完了');
  });

練習問題

FizzBuzz

FizzBuzz から練習をはじめよう。1 から 100 までの整数で構成されるシーケンスを生成するには Rx.Observable.range を使う。

var Rx = require('rx');
var source = Rx.Observable.range(1, 100);

source.subscribe(function(value) {
  if (value % 15 === 0) {
    console.log('fizzbuzz');
  } else if (value % 3 === 0) {
    console.log('fizz');
  } else if (value % 5 === 0) {
    console.log('buzz');
  } else {
    console.log(value);
  }
});

map を使って FizzBuzz の判定を複数の関数に分割してみよう。

var Rx = require('rx');
var source = Rx.Observable.range(1, 100);

source
  .map(function(value) {
    return value % 15 === 0 ? 'fizzbuzz' : value;
  })
  .map(function(value) {
    return value % 3 === 0 ? 'fizz' : value;
  })
  .map(function(value) {
    return value % 5 === 0 ? 'buzz' : value;
  })
  .subscribe(function(value) {
    console.log(value);
  });

複雑なシーケンスを生成できるように Rx.Observable.generate が用意されている。

var Rx = require('rx');

var source = Rx.Observable.generate(
  1,
  function (value) { return value < 101; },
  function (value) { return value + 1; },
  function (value) { 
    if (value % 15 === 0) {
      return 'FizzBuzz';
    } else if (value % 3 === 0) {
      return 'Fizz';
    } else if (value % 5 === 0) {
      return 'Buzz';
    } else {
      return value;
    }
  }
);

source.subscribe(function (x) {
  console.log(x);
});

今度は配列を繰り返す方式に取り組んでみよう。ES6 のジェネレーターで無限のシーケンスを生成し、take で最初の100をとる。

var Rx = require('rx');

function *list()
{
   var list = [
    '', '', 'Fizz', '', 'Buzz',
    'Fizz',  '', '', 'Fizz', 'Buzz',
    '', 'Fizz', '', '', 'FizzBuzz'
  ];

   while (true) {
     for (var e of list) {
       yield e;
     }
   }
}

Rx.Observable.from(list())
  .take(100)
  .map(function(value, index) {
    return value === '' ? index + 1: value;
  })
  .subscribe(function(value) {
    console.log(value);
  });

今度は Fizz と Buzz の生成を分割してみよう。それぞれ別のシーケンスを生成した後で Rx.Observable.withLatestFrom を使って統合する。

var Rx = require('rx');

function *fizz()
{
   var list = ['', '', 'Fizz'];

   while (true) {
    for (var e of list) {
      yield e;
    }
   }
}

function *buzz()
{
   var list = ['', '', '', '', 'Bizz'];

   while (true) {
    for (var e of list) {
      yield e;
    }
   }
}

var source1 = Rx.Observable.from(fizz());
var source2 = Rx.Observable.from(buzz());

var source = source1.withLatestFrom(
    source2,
    function (a, b) { return a + b; }
  );

source
  .take(100)
  .map(function(value, index) { return value === '' ? index + 1: value; })
  .subscribe(function (x) {
    console.log('%s', x);
  });

現在時刻を表示する

1秒単位で現在時刻を表示してみましょう。

<div id="result"></div>
var result = document.getElementById('result');
var source = Rx.Observable
    .interval(1000)
    .timeInterval();

source.subscribe(
  function (x) {
    result.innerHTML = (new Date()).toLocaleString();
  },
  function (err) {
    console.log('エラー: ' + err);
  },
  function () {
    console.log('完了');
});

ユーザーの入力を表示する

テキストボックスの入力をそのまま表示させてみましょう。Cycle.js のトップページにもコードサンプルが掲載されています。

<input id="textInput" type="text">
<div id="result"></div>
var textInput = document.querySelector('#textInput');
var result = document.querySelector('#result');

Rx.Observable.fromEvent(textInput, 'input')
  .pluck('target', 'value')
  .startWith('')
  .subscribe(function(value) {
    result.innerHTML = value;
  });

pluck の代わりに map を使うこともできます。

.map(function(ev) { return ev.target.value; })

クリックカウンター

ボタンをクリックするとカウンターの値が1つ増えるものとします。

<button id="plus">増やす</button>
<div id="result">0</div>
var plus = document.getElementById('plus');
var result = document.getElementById('result');

var source = Rx.Observable.fromEvent(plus, 'click')
  .map(function() { return +1; })
  .startWith(0)
  .scan(function(acc, value) { return acc + value; })
  .subscribe(function(value) { result.innerHTML = value; });

次にクリックするとカウンターの値が1減るボタンおよびカウンターの値が0になるリセットボタンを追加してみましょう。

<button id="plus">増やす</button>
<button id="minus">減らす</button>
<button id="reset">リセット</button>
<div id="counter"></div>
var counter = document.getElementById('counter');

var plus = Rx.Observable
  .fromEvent(document.getElementById('plus'), 'click').map(function() { return +1; });
var minus = Rx.Observable
  .fromEvent(document.getElementById('minus'), 'click').map(function() { return -1; });
var reset = Rx.Observable
  .fromEvent(document.getElementById('reset'), 'click').map(function() { return 0; });

Rx.Observable.merge(plus, minus, reset)
  .startWith(0)
  .scan(function(acc, value) { return value === 0 ? 0 : acc + value; })
  .subscribe(function(value) { counter.innerHTML = value; });

オペレーターのトップ10

Staltz 氏はトップ10のオペレーターを挙げています。

  • flatMap
  • merge
  • scan
  • combineLatest
  • withLatestFrom
  • map
  • filter
  • just
  • share
  • shareReplay

just

単一の値を含むシーケンスを返します。

var source = Rx.Observable.just(42);

source.subscribe(function (x) {
  console.log('Next: %s', x);
});

map

var source = Rx.Observable.range(1, 3)
  .map(function (value, index, observable) {
    return value * value;
  });

source.subscribe(function (value) {
  console.log(value);
});

filter

var source = Rx.Observable.range(1, 5)
  .filter(function (value, index, observable) {
    return value % 2 === 0;
  });

source.subscribe(function (value) {
  console.log(value);
});

scan

Obsevable のシーケンスにアキュムレーター関数を適用して、それぞれの中間的な値を返します。

var source = Rx.Observable.range(1, 10)
  .scan(function (acc, value, index, source) { return acc + value; }, 0);

source.subscribe(function (value) {
  console.log(value);
});

merge

複数の Observable を1つの Observable にマージします。

var source = Rx.Observable.of(1, 3, 5);
var source2 = Rx.Observable.of(2, 4, 6);

var source3 = source.merge(source2);

source3.subscribe(function (value) {
  console.log(value);
});

combineLatest

var source1 = Rx.Observable.interval(100)
  .map(function (i) { return 'First: ' + i; });

var source2 = Rx.Observable.interval(200)
  .map(function (i) { return 'Second: ' + i; });

var source = Rx.Observable.combineLatest(
  source1,
  source2
).take(4);

source.subscribe(function (value) {
  console.log(value);
});

withLatestFrom

var source1 = Rx.Observable.interval(100)
    .map(function (i) { return 'First: ' + i; });

var source2 = Rx.Observable.interval(200)
    .map(function (i) { return 'Second: ' + i; });

var source = source1.withLatestFrom(
    source2,
    function (s1, s2) { return s1 + ', ' + s2; }
).take(5);

source.subscribe(function (value) {
  console.log(value);
});

flatMap

var source = Rx.Observable.range(1, 5)
  .flatMap(function (x, i) {
    return Promise.resolve(x + i);
  });

source.subscribe(function (value) {
  console.log(value);
});

share

var source = Rx.Observable.range(1, 5);
var published = source.share();

function createObserver(tag) {
  return Rx.Observer.create(
    function (x) {
      console.log('Next: ' + tag + x);
    });
}

source.subscribe(createObserver('SourceA'));
source.subscribe(createObserver('SourceB'));

published.subscribe(createObserver('SourceC'));
published.subscribe(createObserver('SourceD'));

その他

just (return) と subscribe (forEach)

単一の値を返すには just、戻り値を伴わずに引数を展開したい場合、subscribe を使います。

scan と reduce の使いわけ

集約値の途中経過が必要な場合、scan、不要な場合、reduce とのことです。

zip を使う機会はほとんどない

Staltz 氏のつぶやきによると、zip を使う機会はほとんどなく combineLatest もしくは withLatestFrom もしくは flatMap を使うことを検討したほうがよいとのことです。

JWT 認証

Authenticable Observables: Build a Reactive App with JWT Auth は Fetch API を使うチュートリアルです。

hot と cold

Creating and Subscribing to Simple Observable Sequences にアナロジーによる説明があります。

  • Cold Observable: ムービー
  • Hot Observable: ライブパフォーマンス
  • リプレイされる Hot Observable: ビデオに記録されたライブパフォーマンス

Subject・イベントバス を避ける

Erik Meijer によると Subject は Rx の世界のミュータブル名変数であり、たいていの場合、必要ないとのことです。Bacon.js のチュートリアルでイベントバスを避けるべきである理由がくわしく書かれています。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした