8
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Flowの公式サンプルコードからFlow.partition/2とFlow.reduce/3の使い方を探る

Last updated at Posted at 2023-01-01

はじめに

下記の記事で並列化を試行錯誤していたのですが,Flowについてよくわからなくなってしまいました.

そこで,Flowの基礎に立ち返るべく,下記の公式ドキュメントに立ち返ることにしました.

公式ドキュメントのコード例

Flowの公式ドキュメントには下記のコード例が紹介されています.

As an example, let's implement the classic word counting algorithm using Flow. The word counting program will receive one file and count how many times each word appears in the document.
例として、Flow を使用して従来の単語カウント アルゴリズムを実装してみましょう。 単語カウント プログラムは 1 つのファイルを受け取り、各単語がドキュメントに出現する回数をカウントします。

Enumを使った場合

Enumを使った単語カウントのプログラムは次のとおりです.

File.stream!("path/to/some/file")
|> Enum.flat_map(&String.split(&1, " "))
|> Enum.reduce(%{}, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

使用している関数とコードの解説を列挙します.

  1. File.stream!/3: "path/to/some/file"をパスとみなして,File.Streamを返します.EnumerableCollectableを実装していて,読み込みの時にはEnumerable,書き込みの時にはCollectableとして機能します.ここではEnumerableとして機能します.また,第3引数のオプションを指定していないので,デフォルトで改行"\r\n""\n"に正規化した上で,"\n"で分割して各要素に格納します.
  2. Enum.flat_map/2: 各要素に第2引数の関数を適用し,結果をflatten,つまり[[1, 2, 3], [4, 5, 6]]のように多重になっているリストを[1, 2, 3, 4, 5, 6]のように平坦にします.
    1. String.split/3: 第1引数の文字列を第2引数で与えられる部分文字列で分割したリストを生成します.ここでは,Streamで渡されたEnumerableの各要素を" "で分割します.
  3. Enum.reduce/3: 第2引数をアキュムレータの初期値として,第3引数の関数をアキュムレータ付きで呼び出します.
    1. Map.update/4: 第1引数のMapを第2引数をキーとし,存在しなかった時には第3引数を値に,存在した時には第4引数の関数で値を更新します.ここでは,各要素をキーとして,キーが存在しなかった時には値を1に,存在した時には値に1加えます.結果として,String.split/3で分割された各要素の現れる個数を数えることになります.
  4. Enum.to_list/1: Enumerableをリストにします.

以上を踏まえると,入力として与えるファイルは,空白や改行文字で区切った単語で構成されるテキストファイルということになります.

サンプルコードの疑問点

ここで疑問になるのが,テキストファイルに改行文字が含まれる場合,String.trim/1あるいはString.trim/2を適用していないので,単語に改行文字が含まれるのではないか?という点です.

実際,確認してみたところ,テキストファイルに改行文字が含まれる場合は,単語に改行文字が含まれた状態でカウントされるので,同じ単語が改行位置にある場合とない場合で別々の単語としてカウントされてしまいます.これは実務上は不具合となるので,次のように修正が必要です.

File.stream!("path/to/some/file")
|> Enum.map(&String.trim/1)
|> Enum.flat_map(&String.split(&1, " "))
|> Enum.reduce(%{}, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

Streamを使った場合

Streamを使った単語カウントのプログラムは次のとおりです.

File.stream!("path/to/some/file")
|> Stream.flat_map(&String.split(&1, " "))
|> Enum.reduce(%{}, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

Enum.flat_map/2の代わりにStream.flat_map/2を使っただけです.

前述のサンプルコードの疑問点について

前述のテキストファイルに改行文字が含まれる場合の問題点はStream版でも解消されていないので,次のように修正する必要があります.

File.stream!("path/to/some/file")
|> Stream.map(&String.trim/1)
|> Stream.flat_map(&String.split(&1, " "))
|> Enum.reduce(%{}, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

Flowを使った場合

Flowを使った単語カウントのプログラムは次のとおりです.

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

ここで,新しく次の関数が登場します.

  • Flow.from_enumerable/2: Enumerableを元にしてFlowを形成します.
  • Flow.partition/2: Flowに対して,新しいパーティションを作成します.
  • Flow.reduce/3: Flowに対して,与えられたアキュムレータを用いて与えられた値を集約します.Enum.reduce/3との違いは,第2引数が初期値だったのを,初期値を与えるような関数を与えるというように変える必要があります.Flow.reduce/3では集約をウィンドウごとに並列に行うのですが,そのウィンドウごとに初期値を生成するような関数を与えます.この例では,ウィンドウごとに空のマップ%{}を与えます.

Flow.partition/2の役割について

Flow.partition/2がどのような役割をしているかについては,下記に書かれています.

詳しくは上記を読んでほしいのですが,要約すると,異なる行に同じ単語が登場した時に,Flow.partition/2がないと別の単語であるかのように集計されてしまう問題が生じるのですが,Flow.flat_map/2Flow.reduce/3の間に,Flow.partition/2を入れることで,同じハッシュ値を持つ値を1つのパーティションにまとめてくれるので,同じ単語を1つにまとめて集計してくれるようになるということです.

Flow.partition/2を入れるかどうかで結果が変わってしまうかどうかを検査するには,Flow.from_enumerable/2max_demand: 1を指定して1つの要素ごとに並列化した上で,Flow.partition/2を入れた場合と入れなかった場合で結果が変わるかを見るのが確実であるということです.max_demandの値が1より大きい場合だと検出できない場合があります.

前述のサンプルコードの疑問点について

前述のテキストファイルに改行文字が含まれる場合の問題点はFlow版でも解消されていないので,Enum版同様に,File.stream!/3|> Flow.flat_map(&String.split(&1, " "))の間に,|> Flow.map(&String.trim/1)を入れる必要があります.Flowにした後で|> Flow.map(&String.trim/1)を実行する必要があるので,Flow.from_enumerable/2の直後に入れることになります.

ここで,前述のFlow.partition/2のことを考えると,次の2つのバリエーションがありえます.

Flow.partition/2が1つのバージョン: flow_with_partitionと呼びます.

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.map(&String.trim/1)
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

Flow.partition/2が2つのバージョン: flow_with_independent_partitionと呼びます.

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.map(&String.trim/1)
|> Flow.partition()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

この記事では,どちらがパフォーマンス上,妥当なのかを検証してみたいと思います.

これに対し,Flow.partition/2を全く入れないバージョンを,flow_without_partitionと呼ぶことにします(比較のため).

ランダムテストデータの生成

ランダムテストデータを自動生成してみることを考えます.仕様としては,それぞれ0からmのランダム値数のランダムな英単語で構成される行が,0からnのランダム値の行数存在するようなテキストファイルを生成します.ランダムな英単語というのは https://www.mit.edu/~ecprice/wordlist.10000 からランダムに英単語を生成します.

下記のReqを使って前述のURLから単語集を取得することにします.

persistent_term を用いて単語を記録することにしました.

次のようなプログラムを書きました.

defmodule WordGen do
  @url_wordlist_10_000 "https://www.mit.edu/~ecprice/wordlist.10000"
  @temporary_file_prefix "words"

  def init() do
    :persistent_term.put(WordGen, get_words())
  end

  def word(n) do
    WordGen
    |> :persistent_term.get()
    |> elem(n)
  end

  def num_words() do
    WordGen
    |> :persistent_term.get()
    |> tuple_size()
  end

  def random_word() do
    (num_words() - 1)
    |> :rand.uniform()
    |> word()
  end

  def random_line(n) do
    0..:rand.uniform(n)
    |> Enum.map(fn _ -> random_word() end)
    |> Enum.join(" ")
  end

  def random_text(n, m) do
    (0..:rand.uniform(n)
    |> Enum.map(fn _ -> random_line(m) end)
    |> Enum.join("\n"))
    <> "\n"
  end

  defp get_words() do
    Req.get!(@url_wordlist_10_000).body
    |> String.split()
    |> List.to_tuple()
  end

  def temporary_filename(n, m) do
    @temporary_file_prefix <> "_#{n}_#{m}.txt"
  end

  def gen_file(n, m) do
    temporary_filename(n, m)
    |> File.open!([:write])
    |> IO.binwrite(random_text(n, m))
  end
end

プログラムの説明は後で書きます.

性能検証実験

Enum版,Stream版と2つのFlow版,flow_with_partitionflow_with_independent_partitionのどれがパフォーマンス上,妥当なのかを検証してみたいと思います.また比較のために,正しく動作しないflow_without_partitionも見てみます.

検証方法

Bencheeを使って性能を計測します.

検証プログラム

Mix.install([
  {:req, "~> 0.3"},
  {:flow, "~> 1.2"},
  {:benchee, "~> 1.1"}
])

defmodule WordGen do
  @url_wordlist_10_000 "https://www.mit.edu/~ecprice/wordlist.10000"
  @temporary_file_prefix "words"

  def init() do
    :persistent_term.put(WordGen, get_words())
  end

  def word(n) do
    WordGen
    |> :persistent_term.get()
    |> elem(n)
  end

  def num_words() do
    WordGen
    |> :persistent_term.get()
    |> tuple_size()
  end

  def random_word() do
    (num_words() - 1)
    |> :rand.uniform()
    |> word()
  end

  def random_line(n) do
    0..:rand.uniform(n)
    |> Enum.map(fn _ -> random_word() end)
    |> Enum.join(" ")
  end

  def random_text(n, m) do
    (0..:rand.uniform(n)
    |> Enum.map(fn _ -> random_line(m) end)
    |> Enum.join("\n"))
    <> "\n"
  end

  defp get_words() do
    Req.get!(@url_wordlist_10_000).body
    |> String.split()
    |> List.to_tuple()
  end

  def temporary_filename(n, m) do
    @temporary_file_prefix <> "_#{n}_#{m}.txt"
  end

  def gen_file(n, m) do
    temporary_filename(n, m)
    |> File.open!([:write])
    |> IO.binwrite(random_text(n, m))
  end
end

defmodule WordCounter do
  def by_enum(filename) do
    filename
    |> File.stream!()
    |> Enum.map(&String.trim/1)
    |> Enum.flat_map(&String.split(&1, " "))
    |> Enum.reduce(%{}, fn word, acc ->
      Map.update(acc, word, 1, & &1 + 1)
    end)
    |> Enum.to_list()
    |> Map.new()
  end

  def by_stream(filename) do
    filename
    |> File.stream!()
    |> Stream.map(&String.trim/1)
    |> Stream.flat_map(&String.split(&1, " "))
    |> Enum.reduce(%{}, fn word, acc ->
      Map.update(acc, word, 1, & &1 + 1)
    end)
    |> Enum.to_list()
    |> Map.new()
  end

  def by_flow_with_partition(filename) do
    filename
    |> File.stream!()
    |> Flow.from_enumerable()
    |> Flow.map(&String.trim/1)
    |> Flow.flat_map(&String.split(&1, " "))
    |> Flow.partition()
    |> Flow.reduce(fn -> %{} end, fn word, acc ->
      Map.update(acc, word, 1, & &1 + 1)
    end)
    |> Enum.to_list()
    |> Map.new()
  end

  def by_flow_with_independent_partition(filename) do
    filename
    |> File.stream!()
    |> Flow.from_enumerable()
    |> Flow.map(&String.trim/1)
    |> Flow.partition()
    |> Flow.flat_map(&String.split(&1, " "))
    |> Flow.partition()
    |> Flow.reduce(fn -> %{} end, fn word, acc ->
      Map.update(acc, word, 1, & &1 + 1)
    end)
    |> Enum.to_list()
    |> Map.new()
  end

  def by_flow_without_partition(filename) do
    filename
    |> File.stream!()
    |> Flow.from_enumerable()
    |> Flow.map(&String.trim/1)
    |> Flow.flat_map(&String.split(&1, " "))
    |> Flow.reduce(fn -> %{} end, fn word, acc ->
      Map.update(acc, word, 1, & &1 + 1)
    end)
    |> Enum.to_list()
    |> Map.new()
  end
end

:rand.seed(:exsss, {1, 2, 3})
WordGen.init()

inputs =
  [{100, 100}, {1000, 1000}, {10000, 10000}]
  |> Enum.map(fn {n, m} ->
    WordGen.gen_file(n, m)
    {"{#{n}, #{m}}", WordGen.temporary_filename(n, m)}
  end)
  |> Map.new()
  |> IO.inspect()

inputs
|> Enum.map(fn {size, filename} ->
  enum = WordCounter.by_enum(filename)
  stream = WordCounter.by_stream(filename)
  flow_with_partition = WordCounter.by_flow_with_partition(filename)
  flow_with_independent_partition = WordCounter.by_flow_with_independent_partition(filename)
  flow_without_partition = WordCounter.by_flow_without_partition(filename)
  IO.inspect(stream == enum, label: "#{size} stream x enum")
  IO.inspect(enum == flow_with_partition, label: "#{size} enum x flow_with_partition")
  IO.inspect(enum == flow_with_independent_partition, label: "#{size} enum x flow_with_independent_partition")
  IO.inspect(enum == flow_without_partition, label: "#{size} enum x flow_without_partition")
end)

Benchee.run(
  %{
    "WordCounter.by_enum" => fn input -> WordCounter.by_enum(input) end,
    "WordCounter.by_stream" => fn input -> WordCounter.by_stream(input) end,
    "WordCounter.by_flow_with_partition" => fn input -> WordCounter.by_flow_with_partition(input) end,
    "WordCounter.by_flow_with_independent_partition" => fn input -> WordCounter.by_flow_with_independent_partition(input) end,
    "WordCounter.by_flow_without_partition" => fn input -> WordCounter.by_flow_without_partition(input) end
  },
  inputs: inputs
)

M1 Ultra Mac Studio での実行結果

% elixir flow_bench.exs 
%{
  "{100, 100}" => "words_100_100.txt",
  "{1000, 1000}" => "words_1000_1000.txt",
  "{10000, 10000}" => "words_10000_10000.txt"
}
{100, 100} stream x enum: true
{100, 100} enum x flow_with_partition: true
{100, 100} enum x flow_with_independent_partition: true
{100, 100} enum x flow_without_partition: true
{1000, 1000} stream x enum: true
{1000, 1000} enum x flow_with_partition: true
{1000, 1000} enum x flow_with_independent_partition: true
{1000, 1000} enum x flow_without_partition: true
{10000, 10000} stream x enum: true
{10000, 10000} enum x flow_with_partition: true
{10000, 10000} enum x flow_with_independent_partition: true
{10000, 10000} enum x flow_without_partition: false
Operating System: macOS
CPU Information: Apple M1 Ultra
Number of Available Cores: 20
Available memory: 128 GB
Elixir 1.14.2
Erlang 25.2

Benchmark suite executing with the following configuration:
warmup: 2 s
time: 5 s
memory time: 0 ns
reduction time: 0 ns
parallel: 1
inputs: {100, 100}, {1000, 1000}, {10000, 10000}
Estimated total run time: 1.75 min

Benchmarking WordCounter.by_enum with input {100, 100} ...
Benchmarking WordCounter.by_enum with input {1000, 1000} ...
Benchmarking WordCounter.by_enum with input {10000, 10000} ...
Benchmarking WordCounter.by_flow_with_independent_partition with input {100, 100} ...
Benchmarking WordCounter.by_flow_with_independent_partition with input {1000, 1000} ...
Benchmarking WordCounter.by_flow_with_independent_partition with input {10000, 10000} ...
Benchmarking WordCounter.by_flow_with_partition with input {100, 100} ...
Benchmarking WordCounter.by_flow_with_partition with input {1000, 1000} ...
Benchmarking WordCounter.by_flow_with_partition with input {10000, 10000} ...
Benchmarking WordCounter.by_flow_without_partition with input {100, 100} ...
Benchmarking WordCounter.by_flow_without_partition with input {1000, 1000} ...
Benchmarking WordCounter.by_flow_without_partition with input {10000, 10000} ...
Benchmarking WordCounter.by_stream with input {100, 100} ...
Benchmarking WordCounter.by_stream with input {1000, 1000} ...
Benchmarking WordCounter.by_stream with input {10000, 10000} ...

##### With input {100, 100} #####
Name                                                     ips        average  deviation         median         99th %
WordCounter.by_enum                                  1565.88        0.64 ms    ±18.11%        0.62 ms        0.85 ms
WordCounter.by_stream                                1512.57        0.66 ms    ±10.85%        0.65 ms        0.90 ms
WordCounter.by_flow_without_partition                 787.66        1.27 ms     ±7.63%        1.26 ms        1.55 ms
WordCounter.by_flow_with_partition                    372.14        2.69 ms     ±8.07%        2.67 ms        3.32 ms
WordCounter.by_flow_with_independent_partition        207.71        4.81 ms    ±51.27%        4.52 ms       12.73 ms

Comparison: 
WordCounter.by_enum                                  1565.88
WordCounter.by_stream                                1512.57 - 1.04x slower +0.0225 ms
WordCounter.by_flow_without_partition                 787.66 - 1.99x slower +0.63 ms
WordCounter.by_flow_with_partition                    372.14 - 4.21x slower +2.05 ms
WordCounter.by_flow_with_independent_partition        207.71 - 7.54x slower +4.18 ms

##### With input {1000, 1000} #####
Name                                                     ips        average  deviation         median         99th %
WordCounter.by_flow_with_independent_partition         23.75       42.11 ms     ±3.60%       42.33 ms       44.15 ms
WordCounter.by_flow_with_partition                     21.75       45.99 ms     ±2.75%       46.17 ms       48.49 ms
WordCounter.by_enum                                    20.13       49.68 ms     ±1.87%       49.59 ms       54.46 ms
WordCounter.by_stream                                  19.71       50.73 ms     ±2.41%       50.60 ms       56.63 ms
WordCounter.by_flow_without_partition                  11.90       84.05 ms     ±0.62%       84.05 ms       85.40 ms

Comparison: 
WordCounter.by_flow_with_independent_partition         23.75
WordCounter.by_flow_with_partition                     21.75 - 1.09x slower +3.88 ms
WordCounter.by_enum                                    20.13 - 1.18x slower +7.57 ms
WordCounter.by_stream                                  19.71 - 1.20x slower +8.62 ms
WordCounter.by_flow_without_partition                  11.90 - 2.00x slower +41.94 ms

##### With input {10000, 10000} #####
Name                                                     ips        average  deviation         median         99th %
WordCounter.by_flow_with_independent_partition          0.34         2.92 s     ±0.16%         2.92 s         2.93 s
WordCounter.by_flow_without_partition                   0.32         3.17 s     ±5.50%         3.17 s         3.29 s
WordCounter.by_flow_with_partition                      0.31         3.21 s     ±1.43%         3.21 s         3.25 s
WordCounter.by_stream                                   0.25         4.07 s     ±0.33%         4.07 s         4.08 s
WordCounter.by_enum                                    0.165         6.06 s     ±0.00%         6.06 s         6.06 s

Comparison: 
WordCounter.by_flow_with_independent_partition          0.34
WordCounter.by_flow_without_partition                   0.32 - 1.08x slower +0.24 s
WordCounter.by_flow_with_partition                      0.31 - 1.10x slower +0.29 s
WordCounter.by_stream                                   0.25 - 1.39x slower +1.15 s
WordCounter.by_enum                                    0.165 - 2.07x slower +3.14 s
  • {100, 100}だとEnum版が最も速かったです.
  • {1000, 1000}{10000, 10000}だとflow_with_independent_partitionが最も速かったです.
  • {10000, 10000}だと,flow_without_partitionの結果が不正になります.

疑問点

{1000, 1000}{10000, 10000}の時に,flow_with_partitionよりもflow_with_independent_partitionの方が高速になる理由,すなわち,|> Flow.map(&String.trim/1)の実行の直後にFlow.partition/2を入れた方が高速になる理由がよくわかりません.

8
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?