この記事は「fukuoka.ex x ザキ研 Advent Calendar 2017」10日目です。
昨日は @Takeshi-Kogu さんの「Elixirでファイルの入出力をしてみた」でした。
はじめに
前回,「Elixirで一千万行のJSONデータで遊んでみた」 という記事を書きました。
そこで,@twinbee さんにこんなコメントを頂きました。
q2_2なんですが、Reduceを挟まないこの処理だとFlow.partitionは必用ないと思います。
GenStageが多段になってるので、本来の性能よりかなり遅い結果になると思います。
ちなみに私のPCだと、partitionを外した場合42.717秒 ⇒ 36.409秒になりました。
並列処理初心者かつElixir初心者の私は,つまりどういうことだ・・・?となったので,せっかくなので自分なりにまとめておこうと思って記事を書きます。
Flow.partitionについて
Flow.partition
を実行すると,新しくStageが追加されます。
新しく追加されるStageは,Hash関数によって,同じデータは同じStageに配布されるようになります。
これによって,データが重複すると困る集計処理であっても問題なく処理することができます。
前回の記事で利用したサンプルコードの問題の部分
defmodule Basic do
def q2_2 do
"data.json"
|> File.stream!
|> Flow.from_enumerable
|> Flow.map(fn d -> Poison.decode!(d) end)
|> Flow.partition
|> Flow.filter(fn d -> d["age"] <= 20 end)
|> Enum.count
end
end
Flow.from_enumerable
でStageに並列展開されていて,その後並列展開されたデータそれぞれで, JSONデータのDecode -> Filter
と処理を行っています。
つまり,並列展開されたデータが重複するような処理は存在せず,並列展開されたデータを最後に集めて Enum.count
しただけなので, Partitioning
は不要だったというわけです。
また, Partitioning
を行う際にHash関数による処理が走るので無駄に時間がかかる結果となりました。
修正後のコードと結果
defmodule Basic do
def q2_2 do
"data.json"
|> File.stream!
|> Flow.from_enumerable(stages: 2)
|> Flow.map(fn d -> Poison.decode!(d) end)
|> Flow.filter(fn d -> d["age"] <= 20 end)
|> Enum.count
end
end
$ /usr/bin/time mix run -e "Basic.q2_2"
real | user | sys | |
---|---|---|---|
q2_2(前回) | 45.60 s | 116.39 s | 6.49 s |
q2-2(今回) | 37.16 s | 90.33 s | 3.07 s |
Partitioning
で結構時間をロスしてたことが分かりますね。
終わりに
今回は, Flow.partition
について書いていきました。
記事の内容では,前回の記事に対応した程度のことしか書いていないのでほんの一部の内容だと思います。
記事を書く中で,より知見が深まっていき,楽しみながら学ぶことができているので,とてもいいですね。
@twinbee さんの Elixir並列処理「Flow」の2段ステージ構造を理解する を参考にさせていただきました。ありがとうございます。
明日は, @zacky1972 さんの記事になります。お楽しみに!