はじめに
Reactive Extensions
(以降 Rx
)と出会って早4~5年になるのですが、現在、ウチが手掛けてるアプリはコイツがないとやっていけないくらい重要なライブラリです。
で、ウチらのプロジェクトチームに参加していただく時に、何度か同じ説明をしているので、せっかくなので記事にしようかと思った次第です。
元々、C++において複雑な非同期処理をどうするかで技術選定を進めている中で Rx
と出会った経緯があり、開発言語が C# や Typescript(以降TS) だっけだったら、もしかすると Rx
を選んでいなかったかも知れません。まぁ、C# で async / await に慣れてたし、ありゃ最高です。但し、スレッドを切り替えたり待ち合わせたりの微調整は難しいので、その辺は微妙だけど、TSみたいな基本シングルスレッドならまぁ、async / await で良いと言えば良い。
で、C++で記事を書こうと思ったのですが、取り急ぎ、身内でで TS での需要がありまして、まずは TSで説明します。
C++ も Rx
の基本は概ね同じなのですが、「言語共通のメリット・デメリット」と「言語特有のメリット・デメリット」がありまして、一気に話すと分けわからなくなるので、取り敢えず、今回は TS でいきます。
ここでは、ウチらが関わるプロジェクトで使用されている Rx
のコードを理解できて、書けるようになることが目標です。
まぁ、こうして記事を書くのもそれなりの労力で、どうしても進みが遅くなると思うので、下記の2つのサイトは是非ブックマークして、たまに眺めて貰えると嬉しいです。
↓ ご本家
↓ 何度も何度も何度も読ませていただいた先生
なお、この私は Rx
を崇拝している訳でも、 Rx
の啓蒙活動をしている訳ではありません。 1
Rx
は本当に素晴らしいですが、実際に開発していると問題点も多いです。(更に、問題点が Rx
が原因じゃないものもあったりと結構複雑)
ただ、問題点、つまりデメリットとメリットを天秤にかけると、メリットが大きいという判断でプロジェクトチームが採用しているだけです。
今後、新しい非同期処理のパラダイムが登場(または思い付いたら)したら、それらを検討することになるでしょう。
非同期関数との戦い
まずは、何故 Rx
になったのかをザックリ説明するところから始めましょう。
こんな非同期関数 asyncFn()
があったとします。
function asyncFn(a: number, callback: (r: number) => void){
setTimeout(() => {
callback(a * 2);
}, 1000);
}
そうすると、こんな感じで使うことになるかと。
asyncFn(1, (r) => {
console.log(r); /* 2 */
});
まぁ、このレベルじゃ「戦い」なんて感じじゃないですよね。
次に、 asyncFn()
からのコールバックを受け、もう一回 asyncFn()
を呼ぶ場合どうしましょう?
asyncFn(1, (r1) => {
console.log(r1); /* 2 */
asyncFn(r1, (r2) => {
console.log(r2); /* 4 */
});
});
こうなりますね。
で、このレベルならまだしも、実際にアプリ作ってると、WebでPOSTしたり、DBアクセスしたり、ファイルI/Oだったり。。。
まぁ、非同期処理がたんまりあります。
すると、非同期処理が連鎖して。。。
asyncFn(1, (r1) => {
console.log(r1); /* 2 */
asyncFn(r1, (r2) => {
console.log(r2); /* 4 */
asyncFn(r2, (r3) => {
console.log(r3); /* 8 */
asyncFn(r3, (r4) => {
console.log(r4); /* 16 */
asyncFn(r4, (r5) => {
console.log(r5); /* 32 */
asyncFn(r5, (r6) => {
console.log(r6); /* 64 */
});
});
});
});
});
});
こうなりますわな。
そろそろ、やばそうな「戦い」になってきていませんか?
このサンプルは console に出力しているだけですが、実際にアプリを開発するとなると、分岐があったり、中断したり、色々ある訳です。
そうなると、パッと見てなにやってるか良く分からないコードになりまする。 2
戦うためには武器が必要な訳で、その武器を物色していたところ async / await や coroutine、そして Rx
に出会った訳です。
この Rx
は、様々な言語にポーティングされていて、 Rx
はアルゴリズムであって、「言語仕様に依存していない」 ことが採用に至った最大の理由です。
あと、C#でLINQのメソッドチェーンに慣れていたとか、 Rx
でよく表現されるこんな図を理解してみたいという興味が大きかったかも知れません。
当時は、さっぱり分からなかったけど、1年くらいして、ようやく理解できるようになりました。(難しいからじゃありません。私がアホだからです) 3
observable
さて、この単語、私は Rx
に出会って初めて発音しました。
オ・・・オブザーバブル?
何だか発音しにくい。。。
でも、 Rx
やっていると日に10回以上会話で出てきますので、まず慣れてください。
デザインパターンの「オブザーバー(observer)」は「観察者」ですが、「オブザーバブル(observable)」は「観測可能(な何か)」です。つまり、observableはデザインパターンの「サブジェクト(subject)」に該当します。
と、言われて、ピンとこなくても大丈夫です。
逆に、オブザーバーパターンを知っている方は、一旦、用語を忘れてください。
Rx
はオブザーバーパターンそのものですが、そのオブザーバーパターンを更にパラダイムまで昇格していて、このパラダイムで登場する用語が、オブザーバーパターンで登場する用語と微妙に違ったりします。
さて、前置きが長くなりましたが、この observable
ですが、 Rx
の世界では、全てが observable
です。
この、 observable
を「観測する(subscribe)」ことが連鎖する世界、これが Rx
です。
まぁ、「観測する」という何とも仰々しい言い回しですが、恥ずかしがらず、「観測する」を使いましょう。 4
次に「何を観測するんだ?」と、いうと、3種類ございます。
- 値
- エラー
- 終了
これだけです。
私が、初めて Rx
に出会った時、これだけの説明では「????」でしたので、まだ、ほんわかした理解で大丈夫です。
それに、これだけで、処理分岐やループってどうやるのみたいな疑問点も出てくると思いますが、大丈夫です。処理分岐もループ処理も余裕で記述できます。更に、もっと便利な機能が Rx
にはあるのです。
非同期関数から observable を作る
そろそろ飽きてきたと思うので、具体的なコードを見てみましょう。
import { Observable } from "rxjs";
function rxAsyncFn(a: number): Observable<number> {
return new Observable<number>((subscriber) => {
asyncFn(a, (r)=>{
subscriber.next(r);
subscriber.complete();
});
});
}
これは、先ほどの asyncFn()
を observable
にする関数 rxAsyncFn()
です。
良く見ると、 rxAsyncFn()
は数行に及ぶコードなのに、いきなり return
から始まっていますよね。
いきなり return
に、当時の私は「違和感」というか「新鮮だぁ~」みたいな感覚がありましたが、皆さんはどうでしょう?
(実は、この感覚が後に関数型プログラミングの入口だと後から知った感じです) 5
で、この rxAsyncFn()
は observable
インスタンスを生成して「以上、終了!」な関数です。
function rxAsyncFn(a: number): Observable<number> {
return new Observable<number>(/*引数*/);
}
で、ここの「引数」が、関数ですね。
(subscriber) => {
asyncFn(a, (r)=>{
subscriber.next(r);
subscriber.complete();
});
}
さて、ここが今回の重要ポイントです!
この引数で渡された関数は、観測するとき実行される
つまり、
const hoge = rxAsyncFn(1);
このコードでは、非同期関数 asyncFn()
が呼び出されることはありません。
何故なら、 rxAsyncFn()
は、そうです、 observable
ですよね!
observable
は観測することで引数である関数が実行されます。 6
で、この引数である関数が実行されると、 asyncFn()
を呼び出します。
そしてコールバック関数内で、こんなことやってます。
subscriber.next(r);
subscriber.complete();
まず、 subscriber.next(r);
は値を発行する。つまり、観測者が値 r
を受領するということになります。
あ、ここで「発行する(emit)」という新しいワードが登場しました。この発行というのは observable
の立場から見た言い回しです。観測者はobservable
が発行する値を観測することになります。
そして、 subscriber.complete();
は、 observable
が全ての値を発行し終えたことを観測者に知らせるための発行になります。
さぁ、ついさっきの話を思い出してください。
observable
を観測するって「何を観測するんだ?」というと。。。
- 値
- エラー
- 終了
でしたね。
そう、ここでの subscriber.next(r);
は 値
、そして、 subscriber.complete();
は 終了
に対応しています。
エラー
については、そのうち出てきますが、今は忘れてください。
重要なのは、この 関 連 性 です。
observable を観測する
お待ちかね、ようやく observable
を観測する準備ができました。
コードはこんな感じになります。
rxAsyncFn(1)
.subscribe({
next: (r) => {
console.log(r);
},
complete: () => {
console.log("complete");
}
});
2
complete
どうですか?
イメージできますか?
rxAsyncFn()
は observable
を返却します。そして、間髪入れず、 subscribe()
つまり観測することで、 next()
と complete()
が呼び出されます。
そう!
これが、まさに observable
を観測するということです。
observableを連鎖させる
これだけでは、「何だか面倒なコードを書かなきゃならないな~」くらいな感じじゃないですか?
違いますよ~
これからが本領発揮です!!
思い出してください。
非同期処理が連鎖するとこんなコードでしたよね?
asyncFn(1, (r1) => {
console.log(r1); /* 2 */
asyncFn(r1, (r2) => {
console.log(r2); /* 4 */
asyncFn(r2, (r3) => {
console.log(r3); /* 8 */
asyncFn(r3, (r4) => {
console.log(r4); /* 16 */
asyncFn(r4, (r5) => {
console.log(r5); /* 32 */
asyncFn(r5, (r6) => {
console.log(r6); /* 64 */
});
});
});
});
});
});
これが、こうなります。
import { mergeMap } from "rxjs/operators";
rxAsyncFn(1)
.pipe(mergeMap((r /* r1 */) => {
console.log(r);
return rxAsyncFn(r);
}))
.pipe(mergeMap((r /* r2 */) => {
console.log(r);
return rxAsyncFn(r);
}))
.pipe(mergeMap((r /* r3 */) => {
console.log(r);
return rxAsyncFn(r);
}))
.pipe(mergeMap((r /* r4 */) => {
console.log(r);
return rxAsyncFn(r);
}))
.pipe(mergeMap((r /* r5 */) => {
console.log(r);
return rxAsyncFn(r);
}))
.subscribe({
next: (r /* r6 */) => {
console.log(r);
},
complete: () => {
console.log("complete");
}
});
2
4
8
16
32
64
complete
pipe()
7 と mergeMap()
8 は後ほど説明することになりますが、どうですか?
何だか分かりやすい気がしません?
「これやったら次」
「これやったら次」
「これやったら次」
「これやったら次」
みたいな感じに見えません?
出だしは、
rxAsyncFn(1)
から始まって、
.pipe(mergeMap((r) => {
console.log(r);
return rxAsyncFn(r);
}))
こいつが繰り返されて。。。
.subscribe({
next: (r /* r6 */) => {
console.log(r);
},
complete: () => {
console.log("complete");
}
});
最後はこんな感じ。
そして、改めてよ~~~く見ると、「.(ドット)」で繋がっているにお気付きでしょうか?
この「.(ドット)」は、新し言語仕様なんかじゃありません。
「メソッドチェーン」という手法で、先ほどの
rxAsyncFn(1)
.subscribe({
next: (r) => {
console.log(r);
},
complete: () => {
console.log("complete");
}
});
.subscribe({
の箇所がメソッドチェーンです。
つまり、
pipe(mergeMap((r) => {
console.log(r);
return rxAsyncFn(r);
}))
この pipe()
は observable
を返却しているのです。
そして、 observable
には pipe()
関数があるのです。
その pipe()
だか flat_map()
の中で return rxAsyncFn()
によって observable
を生成して。。。
ややこしいですね。
でも、大丈夫!
慣れます。
いや、どうか慣れてください。(苦笑)
ということで、この一連の observable
の連鎖は、 rxAsyncFn(1)
から開始して、 pipe()
と謎の flat_map()
で繋いで、これを subscribe()
つまり観測するという、長~~~い1文(って表現で良いかな?)で構成されているプログラムです。
序章のまとめ
どうです? 気持ち悪いでしょ?(笑)
まぁ、 Promise
を使ってもメソッドチェーンが登場するので、すんなり入ってくる方もいるかと思います。
そして Promise
を熟知されている方は、「Promiseでいいじゃん」ってなるかも知れません。
確かに Rx
を使わなくても Promise
で十分なケースがあります。「あります」というか多いです。
私は、その十分なケースで、無理に Rx
を使う必要はないと思います。
ただ、 Rx
は Promise
的な「機能」ではなく「パラダイム」です。この「パラダイム」がかなり強力でして、この「パラダイム」を徐々に説明していければと思います。 9
また、時間がある時に続編書きますね。
-
ただ、C から C++ へのパラダイムシフトと同様、私のプログラマ人生に与えた影響は計り知れないです。 ↩
-
昔、この連鎖をなくすために、OSによくある非同期メッセージングシステムを採用した強者がいて、もう、何と言うか、どういう順で何が起きるのか追っかけるだけでかなりのカロリーを消費して、メンテナーが劇的なダイエットに成功した実例があります。 ↩
-
ちなみに、
map
みたいな比較的簡単なオペレータの説明では、図中の「〇」を動かせるというのを知っている人は少ないんじゃないかな? 私はついさっき知りましたよ。 http://reactivex.io/documentation/operators/map.html ↩ -
subscribe は「購読する」が正しいですが、ここは何故かオブザーバパターンで登場する「観測する」という言い回しになっていますが、ここは方言ということで勘弁してください。 ↩
-
Rx
はObservable.create
を使うのが一般的ですが、rxjs
では諸般の事情でnew Observable
が推奨されています。 ↩ -
全ての
observable
が観測開始で引数の関数が実行される訳ではありません。observable
のホットとコールドについては後で説明します。 ↩ -
1個の
pipe()
で何個も接続できるのですが、理由あって冗長ですが毎回pipe(flat_map(
と書いてます。 ↩ -
Rx
の本家ではFlatMap
と呼ばれています。 http://reactivex.io/documentation/operators/flatmap.html ↩ -
偉そうに言ってますが、
Rx
の全機能を使ったことはないのでご了承ください。(苦笑) ↩