1. masakielastic
Changes in body
Source | HTML | Preview

Cycle.js の勉強を始めて、RxJS の練習量を積み重ねる必要性を感じたので、基本問題に取り組むことにしました。

セットアップ

RxJS

npm install rx
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.min.js"></script>

RxJS-DOM

npm install rx-dom
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs-dom/7.0.3/rx.dom.min.js"></script>

RxJS-jQuery

npm install rx-jquery
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs-jquery/1.1.6/rx.jquery.min.js"></script>

Fetch API

Can I use のサイトで Fetch API に対応するブラウザーの一覧 を調べることができます。古いブラウザーのために、Github がpolyfill を公開しています。

定義されるメソッドの一覧

RxJS は こちら、RxJS-DOM はこちら、RxJS-jQuery はこちらを参照。

生成

配列、数字のシーケンス

Rx.Observable.from を使って配列から Observable を生成できます。

var Rx = require('rx');

var array = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
var source = Rx.Observable.from(array);

var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); }
);

from の第2引数で map のコールバックに相当する関数を指定できます。

var source = Rx.Observable.from(array, function (x) { return x * x; });

プロミス

var promise = new Promise(function (resolve, reject) {
    resolve(10);
});

var source = Rx.Observable.fromPromise(promise);

var subscription = source.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted');
});

var promise2 = new Promise(function (resolve, reject) {
    reject(new Error('reason'));
});

var source2 = Rx.Observable.fromPromise(promise2);

var subscription2 = source2.subscribe(
  function (x) { console.log('onNext: %s', x); },
  function (e) { console.log('onError: %s', e); },
  function () { console.log('onCompleted'); }
);

ジェネレーター

function* range() {
  var index = 0;
  while(true) {
    yield index++;
  }
}

Rx.Observable.from(range())
  .take(10)
  .subscribe(function (x) {
    console.log('値: %s', x);
});

Ajax

httpbin.org で練習します。

GET

ES6 Promise に対応している Fetch API を使ってみましょう。Promise から Observable を生成するには fromPromise を使います。

var promise = fetch('http://httpbin.org/get')
  .then(function(response) {
    return response.json();
  }).then(function(json) {
    return json.origin;
  }).catch(function(ex) {
    return ex;
  });

Rx.Observable.fromPromise(promise)
  .subscribe(
  function (value) {
    console.log(value);
  },
  function (err) {
    console.log('エラー: %s', err);
  },
  function () {
    console.log('完了');
  });

Rx.Observable.spawn を使ってジェネレーターによる非同期処理をやってみよう。

var Rx = require('rx');
var request = require('request');

var get = Rx.Observable.fromNodeCallback(request);

Rx.Observable.spawn(function* () {
  var data;

  try {
    data = yield get('http://httpbin.org/get').timeout(5000 /*ms*/);
  } catch (e) {
    console.log('Error %s', e);
  } 

  console.log(JSON.parse(data[0].body).origin);
}).subscribe();

POST

まずは FormData を使ってみましょう。Content-Type の値は multipart/form-data になります。

var data = new FormData();
data.append('input', 'bar');

var promise = fetch('http://httpbin.org/post', {
  method: 'post',
  body: data
}).then(function(response) {
    return response.json()
  })
  .then(function(json) {
    return json.form;
  })
  .catch(function(ex) {
    return ex;
});

Rx.Observable.fromPromise(promise)
  .subscribe(
  function (value) {
    console.log(value);
  },
  function (err) {
    console.log('エラー: %s', err);
  },
  function () {
    console.log('完了');
  });

練習問題

FizzBuzz

FizzBuzz から練習をはじめよう。1 から 100 までの整数で構成されるシーケンスを生成するには Rx.Observable.range を使う。

var Rx = require('rx');
var source = Rx.Observable.range(1, 100);

source.subscribe(function(value) {
  if (value % 15 === 0) {
    console.log('fizzbuzz');
  } else if (value % 3 === 0) {
    console.log('fizz');
  } else if (value % 5 === 0) {
    console.log('buzz');
  } else {
    console.log(value);
  }
});

map を使って FizzBuzz の判定を複数の関数に分割してみよう。

var Rx = require('rx');
var source = Rx.Observable.range(1, 100);

source
  .map(function(value) {
    return value % 15 === 0 ? 'fizzbuzz' : value;
  })
  .map(function(value) {
    return value % 3 === 0 ? 'fizz' : value;
  })
  .map(function(value) {
    return value % 5 === 0 ? 'buzz' : value;
  })
  .subscribe(function(value) {
    console.log(value);
  });

複雑なシーケンスを生成できるように Rx.Observable.generate が用意されている。

var Rx = require('rx');

var source = Rx.Observable.generate(
  1,
  function (value) { return value < 101; },
  function (value) { return value + 1; },
  function (value) { 
    if (value % 15 === 0) {
      return 'FizzBuzz';
    } else if (value % 3 === 0) {
      return 'Fizz';
    } else if (value % 5 === 0) {
      return 'Buzz';
    } else {
      return value;
    }
  }
);

source.subscribe(function (x) {
  console.log(x);
});

今度は配列を繰り返す方式に取り組んでみよう。ES6 のジェネレーターで無限のシーケンスを生成し、take で最初の100をとる。

var Rx = require('rx');

function *list()
{
   var list = [
    '', '', 'Fizz', '', 'Buzz',
    'Fizz',  '', '', 'Fizz', 'Buzz',
    '', 'Fizz', '', '', 'FizzBuzz'
  ];

   while (true) {
     for (var e of list) {
       yield e;
     }
   }
}

Rx.Observable.from(list())
  .take(100)
  .map(function(value, index) {
    return value === '' ? index + 1: value;
  })
  .subscribe(function(value) {
    console.log(value);
  });

今度は Fizz と Buzz の生成を分割してみよう。それぞれ別のシーケンスを生成した後で Rx.Observable.withLatestFrom を使って統合する。

var Rx = require('rx');

function *fizz()
{
   var list = ['', '', 'Fizz'];

   while (true) {
    for (var e of list) {
      yield e;
    }
   }
}

function *buzz()
{
   var list = ['', '', '', '', 'Bizz'];

   while (true) {
    for (var e of list) {
      yield e;
    }
   }
}

var source1 = Rx.Observable.from(fizz());
var source2 = Rx.Observable.from(buzz());

var source = source1.withLatestFrom(
    source2,
    function (a, b) { return a + b; }
  );

source
  .take(100)
  .map(function(value, index) { return value === '' ? index + 1: value; })
  .subscribe(function (x) {
    console.log('%s', x);
  });

現在時刻を表示する

1秒単位で現在時刻を表示してみましょう。

<div id="result"></div>
var result = document.getElementById('result');
var source = Rx.Observable
    .interval(1000)
    .timeInterval();

source.subscribe(
  function (x) {
    result.innerHTML = (new Date()).toLocaleString();
  },
  function (err) {
    console.log('エラー: ' + err);
  },
  function () {
    console.log('完了');
});

ユーザーの入力を表示する

テキストボックスの入力をそのまま表示させてみましょう。Cycle.js のトップページにもコードサンプルが掲載されています。

<input id="textInput" type="text">
<div id="result"></div>
var textInput = document.querySelector('#textInput');
var result = document.querySelector('#result');

Rx.Observable.fromEvent(textInput, 'input')
  .pluck('target', 'value')
  .startWith('')
  .subscribe(function(value) {
    result.innerHTML = value;
  });

pluck の代わりに map を使うこともできます。

.map(function(ev) { return ev.target.value; })

クリックカウンター

ボタンをクリックするとカウンターの値が1つ増えるものとします。

<button id="plus">増やす</button>
<div id="result">0</div>
var plus = document.getElementById('plus');
var result = document.getElementById('result');

var source = Rx.Observable.fromEvent(plus, 'click')
  .map(function() { return +1; })
  .startWith(0)
  .scan(function(acc, value) { return acc + value; })
  .subscribe(function(value) { result.innerHTML = value; });

次にクリックするとカウンターの値が1減るボタンおよびカウンターの値が0になるリセットボタンを追加してみましょう。

<button id="plus">増やす</button>
<button id="minus">減らす</button>
<button id="reset">リセット</button>
<div id="counter"></div>
var counter = document.getElementById('counter');

var plus = Rx.Observable
  .fromEvent(document.getElementById('plus'), 'click').map(function() { return +1; });
var minus = Rx.Observable
  .fromEvent(document.getElementById('minus'), 'click').map(function() { return -1; });
var reset = Rx.Observable
  .fromEvent(document.getElementById('reset'), 'click').map(function() { return 0; });

Rx.Observable.merge(plus, minus, reset)
  .startWith(0)
  .scan(function(acc, value) { return value === 0 ? 0 : acc + value; })
  .subscribe(function(value) { counter.innerHTML = value; });

オペレーターのトップ10

Staltz 氏はトップ10のオペレーターを挙げています。

  • flatMap
  • merge
  • scan
  • combineLatest
  • withLatestFrom
  • map
  • filter
  • just
  • share
  • shareReplay

just

単一の値を含むシーケンスを返します。

var source = Rx.Observable.just(42);

source.subscribe(function (x) {
  console.log('Next: %s', x);
});

map

var source = Rx.Observable.range(1, 3)
  .map(function (value, index, observable) {
    return value * value;
  });

source.subscribe(function (value) {
  console.log(value);
});

filter

var source = Rx.Observable.range(1, 5)
  .filter(function (value, index, observable) {
    return value % 2 === 0;
  });

source.subscribe(function (value) {
  console.log(value);
});

scan

Obsevable のシーケンスにアキュムレーター関数を適用して、それぞれの中間的な値を返します。

var source = Rx.Observable.range(1, 10)
  .scan(function (acc, value, index, source) { return acc + value; }, 0);

source.subscribe(function (value) {
  console.log(value);
});

merge

複数の Observable を1つの Observable にマージします。

var source = Rx.Observable.of(1, 3, 5);
var source2 = Rx.Observable.of(2, 4, 6);

var source3 = source.merge(source2);

source3.subscribe(function (value) {
  console.log(value);
});

combineLatest

var source1 = Rx.Observable.interval(100)
  .map(function (i) { return 'First: ' + i; });

var source2 = Rx.Observable.interval(200)
  .map(function (i) { return 'Second: ' + i; });

var source = Rx.Observable.combineLatest(
  source1,
  source2
).take(4);

source.subscribe(function (value) {
  console.log(value);
});

withLatestFrom

var source1 = Rx.Observable.interval(100)
    .map(function (i) { return 'First: ' + i; });

var source2 = Rx.Observable.interval(200)
    .map(function (i) { return 'Second: ' + i; });

var source = source1.withLatestFrom(
    source2,
    function (s1, s2) { return s1 + ', ' + s2; }
).take(5);

source.subscribe(function (value) {
  console.log(value);
});

flatMap

var source = Rx.Observable.range(1, 5)
  .flatMap(function (x, i) {
    return Promise.resolve(x + i);
  });

source.subscribe(function (value) {
  console.log(value);
});

share

var source = Rx.Observable.range(1, 5);
var published = source.share();

function createObserver(tag) {
  return Rx.Observer.create(
    function (x) {
      console.log('Next: ' + tag + x);
    });
}

source.subscribe(createObserver('SourceA'));
source.subscribe(createObserver('SourceB'));

published.subscribe(createObserver('SourceC'));
published.subscribe(createObserver('SourceD'));

その他

just (return) と subscribe (forEach)

単一の値を返すには just、戻り値を伴わずに引数を展開したい場合、subscribe を使います。

scan と reduce の使いわけ

集約値の途中経過が必要な場合、scan、不要な場合、reduce とのことです。

zip を使う機会はほとんどない

Staltz 氏のつぶやきによると、zip を使う機会はほとんどなく combineLatest もしくは withLatestFrom もしくは flatMap を使うことを検討したほうがよいとのことです。

JWT 認証

Authenticable Observables: Build a Reactive App with JWT Auth は Fetch API を使うチュートリアルです。

hot と cold

Creating and Subscribing to Simple Observable Sequences にアナロジーによる説明があります。

  • Cold Observable: ムービー
  • Hot Observable: ライブパフォーマンス
  • リプレイされる Hot Observable: ビデオに記録されたライブパフォーマンス