José Valimさんの2017年3月17日付けのブログ記事Small data with Elixirの翻訳です。
Elixirで「データの集まり」を扱うモジュールには
- Enum
-
Stream
そして新しくできた -
Flow
があります。
ざっくり説明すると - Enumはデータを全部メモリ上に取り込んで処理する
- Streamはデータを必要な時に読み込んで逐次的に処理する(その名の通りStreamだから)
-
Flowはデータを必要な時に読み込んで、さらに並行処理する
という違いがあります。
なお、Flow
はGenStage
の上に作られているので先にElixirの公式サイトのAnnouncing GenStageを読んでおくといいでしょう。
また、こちらに処理速度を比較した"ちきさん"の記事があります1。よく「関数型言語を使うとマルチコアが有効に使える」といいますが実際ここまで簡単にその恩恵に預かれるのはすごいですよね。
これはElixirにおける「スモールデータ」(「ビッグデータ」と対比しています)に関するシリーズ記事の第1回目です。まず「スモールデータ」とは何か、それがなぜ重要なのか定義するところから始め、Flow
ツール及び次回以降の記事で何が出てくるのか簡単に述べます。
どのぐらい小さければスモールデータなのか?
スモールデータとは1台のマシンが希望する時間内に処理できるデータと定義します。このような作業は全てのデータが前もってわかっている場合はバッチ処理で、同期の必要がなく1台またはそれ以上のマシンが入力されるイベントの頻度に追いつけるならばストリーミングデータとして処理されるでしょう。
カリフォルニア大学のYanpei Chen, Sara AlspaughとRandy Katzは様々なMapReduceのワークロードを特徴づけ以下のように結論しました。
ジョブレベルのスケジューリングと全ての実行計画で、全てのワークロードには広がりを持ったジョブのタイプがありますが、最もよく出てくるのは「小さなジョブ」です。これらのジョブは同じワークロードに含まれる他のジョブと比べて全ての尺度で小さかったのです。小さなジョブは数十KBからGBのデータと関係しておりmap及びreduceの過程において様々なデータのパターンを示し、数十秒から数分の実行時間でした。
ケンブリッジ大学及びマックス・プランクソフトウェアシステム研究所のIonel Gog, Malte Schwarzkopf, Natacha Crooks, Matthew P. Grosvenor, Allen Clement, と Steven Hand はMusketeer2を開発したときに様々なソリューションを比較し次のような事柄を見つけました。
小さな(≦0.5GB)入力ではMetis単体マシンのMapReduceシステムが最も好成績をおさめました。実践の場では小さな入力が一般的: 40~80%のClouderaのMapReduceのジョブ及び70%のFacebookのtraceは1GB以下の入力だった、これは重要な事実です。
計算自体がボトルネックにならない代わりに外部ソースからデータを読み込むところがボトルネックになるということはしばしば起こりがちです。並列に外部ソースに対してストリームが入出力できるということはこのようなシステムにおいては最も大切なことです。
最後の例ですが、Frank McSherry, Michael Isardm, and Derek G. Murray が「スケーラビリティ!で、COSTは?(“Scalability! But at what COST?”)」を出しています。一定の問題領域のための一定のプラットフォームにおけるCOSTとはそのプラットフォームが比較対象となるシングルスレッド実装より高性能であるために要求されるハードウェア構成です3。
クラスターコンピューティング環境はラップトップの環境とは違います。前者は高い最大能力とレイテンシに対する高いスループットを遅いCPUコア、ストレージ、そしてメモリで実現させています。今ではラップトップはパーソナルコンピューターを最大能力こそ低いですがより速いCPUコア、ストレージ、及びメモリで実現させています。スケーラブルなシステムは大体の場合クラスターリソースにぴったりなのですが一方でピーク性能のためにはだいたいハードウェアについて検討しておくことが大事です。
言い換えると単体マシンでより効率的に解決できる多くの問題領域が存在するということです。なぜならそれは複雑さやネットワークコミュニケーションやデータのチェックポイントなどビッグデータシステムにありがちなコストを避けられるからです。
スモールデータを厳密に構成することは問題、データサイズ(またはデータの入力速度)及び期待される処理時間に依存します。この記事のシリーズではFlowライブラリを使って異なる問題領域の解を探していくことになるでしょう。Flowは単体マシンでの並行性にテコ入れするもので、おそらく小さなワークロードに適した選択肢としてチームがれっきとしたビッグデータのソリューションを必要としないで済むようになります。
GenStageとFlow
昨年我々はElixirのプロセス間のデータ交換を抽象化したGenStageを導入しました。GenStageはバックプレッシャーを念頭に置いて設計されておりElixirの開発者は外部システム、例えばApache Kafka, RabbitMQ, データベース, ファイルと言ったものからデータ処理システムをオーバーロードさせることなくデータを吸い出せます。
Stageはデータのプロデューサーまたはコンシューマーもしくはその両方になります。一つのプロデューサーStageは複数のコンシューマーを持つことがありそれらコンシューマーはお好みのストラテジーに基づいてイベントを受信します。これは開発者が並行性をテコ入れするための手段として任意にStageのパイプラインを作ることができます。
でも、もし開発者の責任でこれらのパイプラインを作らなければならないとしたら、結局はベストなものは作れないでしょう。それが我々がFlow
というツールをGenStage
上に作った理由です。Flowを使うと開発者はデータの計算をmap
やreduce
やfilter
などと言った関数型操作を使うことで表現できます。パラメータが一度指定されるとFlow
が結合された、データが通過するStageのネットワークの構築の面倒を見てくれます。古典的な(そして決まり文句的な)例としてFlowを使ってファイル上の単語数を数えてみましょう:
File.stream!("path/to/file", [], :line)
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split/1)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, map ->
Map.update(map, word, 1, & &1 + 1)
end)
|> Enum.into(%{})
[^5]
今はまだ上記の例の詳細がわからなくても心配しないでください。これからの投稿でこの例について再度触れることになります。
次のステップ
次の記事では遅延計算と非同期ストリームについてお話する予定です。それらはFlow
の話に入る前によい背景知識となるでしょう。もし一歩先んじてスタートしたいなら私のGenStage及びFlowに関するキーノートスピーチを観るか、Flowプロジェクトの優れたドキュメントを読んでください。
おまけ: 上のコードを試すには
Flow
はまだelixirの標準ライブラリとして取り込まれていないのでHexリポジトリから取り込む必要があります。例えば
$ mix new flowsample1
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/flowsample1.ex
* creating test
* creating test/test_helper.exs
* creating test/flowsample1_test.exs
Your Mix project was created successfully.
You can use "mix" to compile it, test it, and more:
cd flowsample1
mix test
Run "mix help" for more commands.
$ cd flowsample1/
$ vi mix.exs
(依存関係として {:flow, "~> 0.11.1"}をdefp deps do []に追加)
$ mix deps.get
$ iex -S mix
のようにします。