昨日は @tuchiro さんのElixirでSI開発入門 #3 主キーが"id "じゃない既存DBへの接続でした。

本日はElixirのGenStageに入門する#2 バックプレッシャーを理解するの続きです。

:black_square_button::black_large_square::black_square_button: お知らせ :black_square_button::black_large_square::black_square_button:
「fukuoka.ex#11:DB/データサイエンスにコネクトするElixir」を6/22(金)19時に開催します
私もETSとFlowを交えた発表で参加します!
ストリームとデータ処理に興味ある方は、是非ともご参加下さい!

image.png

はじめに

Flowは、並列ストリーム処理を小慣れた関数型の表記で簡潔に書けるモジュールです。今回はElixir Flowの集計で重要な役割を担う2段ステージ構造を扱います。前回までのGenStage入門の内容をふまえて説明していきます。

単語集計

まずは、テキストファイルから単語をカウントするベーシックなFlowのコードを公式に倣って書いてみます。

["elixir erlang LFE ruby elixir ruby"]
|> Flow.from_enumerable                   # 1.Producer-Consumer Stageへ展開
|> Flow.flat_map(&String.split(&1," "))   # 2.テキストファイルをワード分解
|> Flow.partition                         # 3.Partition ! 
|> Flow.reduce( fn -> %{} end,            # 4.単語のカウント 
           fn word, acc -> Map.update(acc, word, 1, & &1 + 1) end)
|> Enum.to_list

このコードを図解にしたものが以下です。ステージが展開されるイメージを表しています。

ElixirFlowBasic.png

以下のように用語設定をします( A段 = Stages A , B段 = Stages B )

コード解説

上記の図とコードを元に解説します。

  1. まずコード中のFlow.from_enumerableですが、ここでは適切なCPUコア数分の:producer-consumerのステージA段が並列展開されます。1行目のリストを:producer`としてそれぞれ自動購読します。

  2. flat_mapの行はA段で実行される処理になります。ここではテキストを単語単位に分解してリスト化しています。

  3. partitionでB段へ展開します。ここでも適切なCPUコア数分のステージが展開されます。(図ではB段のStage数が少なくなっていますが、指定しなければ通常はA段と同じ数です)B段はステージ終端なので:consumerとなります。

  4. reduceはB段のステージで実行されます。単語毎にカウントを行います。


なぜ2段?

さて、Flow.partitionでB段を設けたのですが、なぜ新たなステージが必要なのでしょうか?それは、並列処理を行うためにA段では同じ単語でも異なるステージへ送り込まれる可能性が高いからです。

"elixir"という単語が別々のステージに配布されたことを考えてみましょう。
そのままのステージで単語を集計してしまうと

{"elixir": 1},
{"elixir": 1}

同一単語なのに2行の集計が出来てします。結果を出すためには、各ステージからデータを集めて再集計が必用になり、これはシングルプロセスで行れるので並列性が損なわれてしまいます。

 集計も並列でやるには、どうすればよいでしょうか?


partitionとハッシュ関数

ここでハッシュ関数の出番となります。次の図を見てください。

ElixirFlowPatition.png

まず、各StageはGenStageのプロセスです。GenStageのPartitonDispatcherという配布器を使って、次のステージ段にデータの配布を行います。

この配布器にはHash関数が備わっており、デフォルトでは:erlang.phash2関数が使用されます。引数はハッシュのソースステージ数になります。

"elixir"という単語は2,"erlang"⇒0,"LFE"⇒1,ruby⇒2というステージ数-1以内の数が返ってきます。何度やっても必ず同じなので、同じ単語が同じステージに配布されることが担保されます。最後は集計結果を集めるだけで単語の集計処理が完了することになります。

Hash関数を活かして、B段へのデータの均等配布と並列での集計処理が行われる仕組みがお分かり頂けたでしょうか?。
 
このHash関数に渡すキーや、Hash関数そのものカスタマイズすることも可能です。

終わりに

Flowの2段ステージ構造を、partition関数に組み込まれたHash関数の役割とともに解説を行いました。皆さんもFlowを活かして集計に役立てて下さい!

@shufo さんのElixir Flowでlazyな並列分散処理を参考にしました。ありがとうございます!


次回は「Elixirから簡単にRustを呼び出せるRustler #1 準備編」になります。

明日は @zacky1972 さんの 「ZEAM開発ログv0.1.3 AI/MLを爆速にしたい! Flow のコードを OpenCL で書いてみる〜CPU編」です。お楽しみに!

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.