Edited at

Reactive Extensions (Rx)のススメ


TL;DR

りあくてぃぶえくすてんしょんす?なにそれおいしいの?状態だった筆者がUniRxに触れてRxというものを知り、RxAndroidに触れて感動どころではなくRPG7を撃ち込まれたかと思うほどの衝撃を受けたのでこの感動を共有したかっただけの記事です。


Rxのメリット

そもそもRxとは何ぞや、という方もいるかもしれません。Rxとは、簡単に言うとLINQのやべぇやつです。


LINQ

class Person {

public string
name,
mail;
public int age;
}

var people = new Person[] {
new Person {name="John", mail="john@hoge.com", age=32},
new Person {name="Mike", mail="mike@fuga.com", age=41},
new Person {name="Paul", mail="paul@foo.com", age=26},
new Person {name="Stive", mail="stive@bar.com", age=29},
new Person {name="Oren", mail="oren@hoge.com", age=35}
};

var names = people.Where(p => p.age < 30)
.Select(p => p.name);

foreach(var n in names) {
Console.WriteLine(n);
}
// 出力:
// Paul
// Stive



Rx

class Person {

public string
name,
mail;
public int age;
}

var people = new Person[] {
new Person {name="John", mail="john@hoge.com", age=32},
new Person {name="Mike", mail="mike@fuga.com", age=41},
new Person {name="Paul", mail="paul@foo.com", age=26},
new Person {name="Stive", mail="stive@bar.com", age=29},
new Person {name="Oren", mail="oren@hoge.com", age=35}
};

Observable.ToObservable(people)
.Where(p => p.age < 30)
.Subscribe(
p => Console.WriteLine(p),
_ => {},
() => Console.WriteLine("Completed!"));
// 出力:
// Paul
// Stive


すごく似てないですか? でも、これだけじゃどこがすごいのか分からないですよね?実は、Rxはイベントや時間が絡む処理を得意としています。

例えば、HTTPでサーバー上のリソースをとってきたい場合、2つ以上のメソッドがあって呼ばれるタイミングで処理をしたいとき、一定間隔で処理をしたいときに役立ちます。例えば、次のコードはPython1ですが...


Rxがない場合

from time import sleep

count = 0
while True:
sleep(1)
count += 1
print(count)


このコードの場合、時間待ちにsleepを使用しているためメインスレッドで行えばフリーズしてしまいます。UIスレッドで行えば、UIが固まるのでユーザー体験の質は落ちてしまうでしょう。


Rxがあったら...

from rx.subjects import Subject

Subject.interval(1000).subscribe(print)

こう書けば自動的に別スレッドでの実行になる上、コード自体もコンパクトになりました!

そう、これこそが数多くあるRxの素晴らしいところの一つなのです。自動的に非同期実行にしてくれ、コードの可読性向上にもつながります。また、プログラマはロジックに集中できるので生産性向上にもつながるでしょう。

簡単に書くことができる、というのは素晴らしいことです。例えば、「連続でボタンが押された場合最初以外無視したい」というような場合もあると思います。Rxなら、buttonObservable.throttle_first(interval).subscribe(do_button_pressed())というきわめて単純な書き方でボタンの連打を防ぐことができます。

このように、Rxはヒューマンフレンドリな書き方ができます。Javascriptのようなイベント駆動型の言語において宣言的な書き方で実装できるというのはメンテナンス性という意味でもメリットがあるでしょう。イベント駆動型アーキテクチャというものは往々にして人間にやさしくありません。

そういう意味では、Rxはある種のシュガーシンタックスともいえるのではないでしょうか。


Rxで簡単なタイマーを実装する

UIの実装や関係ないところを簡単に作るため、この章はHTML/Javascriptを使って解説したいと思います。JavascriptのRxはRxJSというライブラリがあるので使いましょう。今回はCSSは書かないので、スタイルをつけたい場合は自分で書いてください。


UIの実装

<!doctype html>

<html>
<head>
<title>Rx タイマー</title>
<meta charset="utf-8" />
<script src="https://unpkg.com/@reactivex/rxjs@5.0.0/dist/global/Rx.min.js"></script>
<script src="main.js"></script>
</head>
<body>
<h1 id="head"></h1>
<form>
<input id="hours" type="number" value="0" min="0" />時間
<input id="minutes" type="number" value="0" min="0" max="60" />
<input id="seconds" type="number" value="0" min="0" max="60" />
<button type="button" id="start">スタート</button>
<button type="button" id="reset">リセット</button>
<button type="button" id="stop">ストップ</button>
</form>
<script>init();</script>
</body>
</html>


main.js

function init() {

window.timer = {closed: true};
window.indicator = document.getElementById("head");
window.hours = document.getElementById("hours");
window.minutes = document.getElementById("minutes");
window.seconds = document.getElementById("seconds");
window.startButton = document.getElementById("start");
window.resetButton = document.getElementById("reset");
window.stopButton = document.getElementById("stop");

indicator.innerText = "タイマー";

Rx.Observable.fromEvent(startButton, "click")
.filter(() => timer.closed)
.subscribe(onStart);

Rx.Observable.fromEvent(resetButton, "click")
.filter(() => timer.closed)
.subscribe(onReset);

Rx.Observable.fromEvent(stopButton, "click")
.filter(() => !timer.closed)
.subscribe(onStop);
}

function onStart(e) {
var time = (parseInt(hours.value) * 3600
+ parseInt(minutes.value) * 60
+ parseInt(seconds.value));
if(time == 0)
return;

indicator.innerText = formatTime(time);

window.timer = Rx.Observable.interval(1000).take(time).subscribe(function (t) {
var remined = time - t - 1;
if(remined != 0) {
indicator.innerText = formatTime(remined);
return;
}
indicator.innerText = "終了!";
Rx.Observable.timer(1000).subscribe(() => indicator.innerText = "タイマー");
});
}

function onStop(e) {
console.log("stoped")
if(timer)
timer.unsubscribe();
indicator.innerText = "停止しました";
Rx.Observable.timer(1000).subscribe(() => indicator.innerText = "タイマー");
}

function onReset(e) {
hours.value = 0;
minutes.value = 0;
seconds.value = 0;
indicator.innerText = "タイマー";
}

function formatTime(secs) {
var h = Math.floor(secs / 3600);
var m = Math.floor(secs / 60 % 60);
var s = Math.floor(secs % 60);
return (h ? h.toString() + "時間" : "") +
(m ? m.toString() + "" : "") +
(s ? s.toString() + "" : "");
}


これくらいならsetInterval使ったほうがいいような気がしますが、Canvasゲームなどでフレームを自分で管理することを考えると地獄でしょう。その場合、Rx.Observable.interval16ミリ秒(60fps)に一回呼ぶようにするとモジュールベースで作成できるので単純化できますね。

また、setIntervalの場合タイマーが動いているかどうかのフラグが必要ですが、こちらの場合はSubscriptionオブジェクトがストリームが生きているかのフラグを保持しているので変数が減ります。


まとめ

Rxは今や一般的といっても過言でないほどさまざまな言語で市民権を獲得している概念です。その本質はインタフェースの単純さ、何をしているのか一目でわかる設計など、「人間に寄り添った」技術であることが大きいと考えます。Java,Kotlin,Swift,C++,C#,PHPなど様々な言語で同じ見た目で同じ使い方ができるというのも魅力的です。ぜひ、この機会にRxの導入を検討してみてはいかがですか?





  1. RxPYを使っている。インストールはpip install rxでできる。