LoginSignup
16
2

More than 1 year has passed since last update.

Elixir Livebook で分散画像処理

Last updated at Posted at 2022-11-27

はじめに

Elixir で画像を複数ノードで分散処理してみます

※同一マシン上で別ノードを起動します。別マシンで起動する場合はまた別途

この記事は @zacky1972 さんが ElixirConf US 2022 で発表した内容の一部を Livebook 上で実行したものです

ElixirConf US 2022 の @zacky1972 さんの発表動画はこちら

参考にした @zacky1972 さんの Gist はこちら

実行したノートブックはこちら

https://github.com/RyoWakabayashi/elixir-learning/blob/main/livebooks/distributed_image_processing/distributed_main.livemd
https://github.com/RyoWakabayashi/elixir-learning/blob/main/livebooks/distributed_image_processing/distributed_worker_1.livemd
https://github.com/RyoWakabayashi/elixir-learning/blob/main/livebooks/distributed_image_processing/distributed_worker_2.livemd
https://github.com/RyoWakabayashi/elixir-learning/blob/main/livebooks/distributed_image_processing/distributed_worker_3.livemd
https://github.com/RyoWakabayashi/elixir-learning/blob/main/livebooks/distributed_image_processing/distributed_worker_4.livemd

前回記事の画像分割、並列画像処理はこちら

実行環境

以下のリポジトリーのコンテナ上で実行しています

Livebook におけるノード

Livebook では実行しているノートブック毎に別ノードが起動されています

Livebook の左メニュー、 IC チップらしきアイコンをクリックすると、 Node name の項目で現在のノートブックのノード名が確認できます

スクリーンショット 2022-11-27 20.51.35.png

今回は分散処理を呼び出すメインと、分散処理を実行するワーカー 1 〜 4 のノートブックをそれぞれ実行します

メインの準備

ノートブックを起動して、以下のコードを実行してセットアップします

Mix.install(
  [
    {:download, "~> 0.0.4"},
    {:evision, "~> 0.1"},
    {:kino, "~> 0.7"},
    {:nx, "~> 0.4"},
    {:flow, "~> 1.2"},
    {:benchee, "~> 1.1"}
  ],
  system_env: [
    {"EVISION_PRECOMPILED_CACHE_DIR", "/tmp/.cache"}
  ]
)

セットアップ対象

  • download: データダウンロード
  • evision: 画像処理
  • kino: 出力可視化
  • nx: 行列演算
  • flow: 並列処理
  • benchee: ベンチマーク

環境変数 EVISION_PRECOMPILED_CACHE_DIR を指定することで、メインと各ワーカーで Evision のキャッシュを使いまわします

画像ダウンロード

処理する画像をダウンロードしてきます

# 再実行時、Download.from()でeexistエラーになるのを防止
File.rm("Lenna_%28test_image%29.png")

lenna =
  Download.from("https://upload.wikimedia.org/wikipedia/en/7/7d/Lenna_%28test_image%29.png")
  |> elem(1)

画像を読み込みます

mat = Evision.imread(lenna)

スクリーンショット 2022-11-11 13.16.30.png

処理内容

分散処理の内容を個別に実行してみます

まず、分散処理ではメインと各ワーカーの間でデータを送受信するため、 Evision のマトリックスからバイナリに変換する必要があります

また、ワーカー上で Evision のマトリックスに戻すため、元の型と形も送受信します

img = Evision.Mat.to_nx(mat)

type = Nx.type(img)
shape = Nx.shape(img)
binary = Nx.to_binary(img)

今回は閾値処理と、文字の描画を行います

分散処理実行時間、どのノードで実行されたのか分かりやすいよう、ノード名を描画するようにしています

dst_binary =
  binary
  # バイナリからマトリックスに変換
  |> Nx.from_binary(type)
  |> Nx.reshape(shape)
  |> Evision.Mat.from_nx_2d()
  # 閾値処理
  # image processing
  |> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
  |> elem(1)
  # 文字を描画
  |> Evision.putText(
    # 自分のノード名
    Node.self() |> Atom.to_string(),
    # 描画位置
    {10, 30},
    # フォント
    Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
    # 文字サイズ
    1.0,
    # 文字色
    {0, 0, 0},
    # 文字の太さ
    [{:thickness, 2}]
  )
  # マトリックスからバイナリに変換
  |> Evision.Mat.to_nx()
  |> Nx.to_binary()
  |> dbg()

スクリーンショット 2022-11-27 21.02.59.png

ワーカーから返ってきたバイナリを Evision のマトリックスに変換して表示します

dst_img =
  dst_binary
  |> Nx.from_binary(type)
  |> Nx.reshape(shape)
  |> Evision.Mat.from_nx_2d()

スクリーンショット 2022-11-27 21.04.29.png

今回はノートブック内で実行しているので、自分のノード名が描画されています

ノード接続確認

ノートブック間で接続できることを確認します

現在の接続ノードを確認します

Node.list(:connected)

スクリーンショット 2022-11-27 21.09.17.png

メインのノートブックは Livebook 本体のノードだけに接続されています

新しくノートブックを起動し、以下のコードでセットアップします
※以降、コードブロックの上に「★○○」の形でどのノートブック用のコードなのかを示します

★ワーカー1

Mix.install(
  [
    {:evision, "~> 0.1"},
    {:kino, "~> 0.7"},
    {:nx, "~> 0.4"},
    {:flow, "~> 1.2"}
  ],
  system_env: [
    {"EVISION_PRECOMPILED_CACHE_DIR", "/tmp/.cache"}
  ]
)

ワーカーのノード名を取得します

★ワーカー1

Node.self()

スクリーンショット 2022-11-27 21.12.05.png

メインノートブックにワーカーノートブックのノード名用入力エリアを用意し、ワーカー1のノード名を入力します

★メイン

worker_1_input = Kino.Input.text("WORKER_1_NODE_NAME")

スクリーンショット 2022-11-27 21.14.05.png

文字列のままだと使いにくいので、 atom に変換しておきます

★メイン

worker_1_atom =
  worker_1_input
  |> Kino.Input.read()
  |> String.to_atom()

スクリーンショット 2022-11-27 21.15.19.png

ワーカー1に接続します

★メイン

Node.connect(worker_1_atom)

true が返ってくれば接続成功です

スクリーンショット 2022-11-27 21.16.23.png

現在の接続ノードを確認します

★メイン

Node.list(:connected)

スクリーンショット 2022-11-27 21.17.28.png

Livebook 本体のノードに加えて、ワーカー1のノードにも接続できていることが確認できます

後で改めて接続するので、一旦接続解除しておきます

★メイン

Node.disconnect(worker_1_atom)

★メイン

Node.list(:connected)

スクリーンショット 2022-11-27 21.09.17.png

画像のコピー

複数画像を分散して処理するため、画像を32枚コピーしておきます

src_file_ext = Path.extname(lenna)
src_file_basename = Path.basename(lenna, src_file_ext)

src_files =
  Stream.unfold(0, fn counter -> {counter, counter + 1} end)
  |> Stream.map(&"#{src_file_basename}_#{&1}#{src_file_ext}")

# コピー枚数
copy_count = 32

src_file_paths =
  mat
  |> List.duplicate(copy_count)
  |> Enum.zip(src_files)
  |> Enum.map(fn {img, dst_file} ->
    Evision.imwrite(dst_file, img)
    dst_file
  end)

スクリーンショット 2022-11-27 21.20.39.png

処理の定義

分散処理するモジュールを定義します

distribute にワーカーと画像の一覧を渡すと、各ワーカーに元画像をバイナリに変換して送信します

各ワーカーが process_image でバイナリをマトリックスに変換し、画像処理を実行します

画像処理後のマトリックスを再びバイナリに変換してワーカーがメインに返します

メインは受信したバイナリをマトリックスに変換し、画像ファイルに保存します

このモジュールは全ノード共通なので、メイン、ワーカー1の両方で同じものを実行しておきます

★メイン、ワーカー1

defmodule DistributedImageProcessing do
  def distribute(workers, images_stream) do
    # ワーカーノードに接続する
    Enum.each(workers, &Node.connect/1)

    # worker_stream is generated repeatedly
    worker_stream =
      Stream.repeatedly(fn -> workers end)
      |> Stream.flat_map(& &1)

    sender_pid = self()

    worker_stream
    |> Stream.zip(images_stream)
    |> Flow.from_enumerable(stages: 4, max_demand: 1)
    |> Flow.map(fn {worker, image} ->
      IO.puts("enter spawn_link")

      {
        Node.spawn_link(worker, fn ->
          # worker receives an image from main
          receive do
            {:img, sender_pid, img} ->
              # call process_image
              {dst_file, img} = process_image(img)

              # An image should be converted into binary, shape and type before sending.
              binary = Nx.to_binary(img)
              shape = Nx.shape(img)
              type = Nx.type(img)

              send(sender_pid, {dst_file, type, shape, binary})
              IO.puts("respond")
          end
        end),
        image
      }
    end)
    |> Flow.map(fn {pid, src_file} ->
      IO.puts("enter reader")

      img =
        src_file
        |> Evision.imread()
        |> Evision.Mat.to_nx()

      # An image should be converted into binary, shape and type before sending.
      binary = Nx.to_binary(img)
      shape = Nx.shape(img)
      type = Nx.type(img)

      send(pid, {:img, sender_pid, {src_file, type, shape, binary}})
    end)
    |> Enum.to_list()
    |> Enum.map(fn _ ->
      IO.puts("enter receiver")

      receive do
        {dst_file, type, shape, binary} ->
          save_image({dst_file, type, shape, binary})
      end
    end)
    |> Enum.to_list()
  end

  def process_image({src_file, type, shape, binary}) do
    IO.puts("enter processor")

    # file name conversion
    src_file_ext = Path.extname(src_file)
    src_file_basename = Path.basename(src_file, src_file_ext)
    dst_file = "#{src_file_basename}_d#{src_file_ext}"

    dst_img =
      binary
      # reconstruction of an image
      |> Nx.from_binary(type)
      |> Nx.reshape(shape)
      |> Evision.Mat.from_nx_2d()
      # image processing
      |> Evision.threshold(127, 255, Evision.Constant.cv_THRESH_BINARY())
      |> elem(1)
      |> Evision.putText(
        Node.self() |> Atom.to_string(),
        {10, 30},
        Evision.Constant.cv_FONT_HERSHEY_SIMPLEX(),
        1.0,
        {0, 0, 0},
        [{:thickness, 2}]
      )
      |> Evision.Mat.to_nx()

    {dst_file, dst_img}
  end

  def save_image({dst_file, type, shape, binary}) do
    img =
      binary
      |> Nx.from_binary(type)
      |> Nx.reshape(shape)
      |> Evision.Mat.from_nx_2d()

    Evision.imwrite(dst_file, img)

    dst_file
  end
end

自身のノードで1つだけ実行

メインのノートブックで直接画像処理だけ実行してみます

★メイン

{lenna, type, shape, binary}
|> DistributedImageProcessing.process_image()
|> then(fn {dst_file, dst_img} ->
  {dst_file, Nx.type(dst_img), Nx.shape(dst_img), Nx.to_binary(dst_img)}
end)
|> DistributedImageProcessing.save_image()
|> Evision.imread()
|> dbg()

スクリーンショット 2022-11-27 21.29.14.png

きちんと処理できています

ワーカーノード1つで1枚だけ実行

ワーカー1で1枚だけ処理してみましょう

★メイン

[worker_1_atom]
|> DistributedImageProcessing.distribute([lenna])
|> Enum.at(0)
|> Evision.imread()

スクリーンショット 2022-11-27 21.30.37.png

ワーカー1のノード名が描画されており、確かにメインではなくワーカー1で画像処理が実行されています

ワーカーノード1つで32枚実行

ではコピーした32枚の画像に対して実行してみましょう

まず対象画像の一覧を取得します

★メイン

# 存在するファイルを取得
images_stream =
  Stream.unfold(0, fn counter -> {counter, counter + 1} end)
  |> Stream.map(&"#{src_file_basename}_#{&1}#{src_file_ext}")
  |> Stream.take_while(fn filename -> File.exists?(filename) end)

実行して、先頭3件の実行結果を見てみましょう

★メイン

[worker_1_atom]
|> DistributedImageProcessing.distribute(images_stream)
|> Enum.slice(0..5)
|> Enum.map(fn dst_filename ->
  dst_filename
  |> Evision.imread()
  |> Kino.render()
end)

スクリーンショット 2022-11-27 21.33.49.png

3枚とも処理されていますね

ワーカーノード4つで分散処理

3つ新しくノートブックを起動し、ワーカー1と同じ内容を実行します

★ワーカー2、3、4

Mix.install(
  [
    {:evision, "~> 0.1"},
    {:kino, "~> 0.7"},
    {:nx, "~> 0.4"},
    {:flow, "~> 1.2"}
  ],
  system_env: [
    {"EVISION_PRECOMPILED_CACHE_DIR", "/tmp/.cache"}
  ]
)

★ワーカー2、3、4

Node.self()

★ワーカー2、3、4

defmodule DistributedImageProcessing do
...
省略
...
end

メインでそれぞれのノード名を入力します

★メイン

worker_2_input = Kino.Input.text("WORKER_2_NODE_NAME")

★メイン

worker_3_input = Kino.Input.text("WORKER_3_NODE_NAME")

★メイン

worker_4_input = Kino.Input.text("WORKER_4_NODE_NAME")

スクリーンショット 2022-11-27 21.37.40.png

ワーカー1〜4のノード名を atom にします

★メイン

workers =
  [worker_1_input, worker_2_input, worker_3_input, worker_4_input]
  |> Enum.map(fn input ->
    input
    |> Kino.Input.read()
    |> String.to_atom()
  end)

ワーカー1〜4で32枚の画像を処理します

★メイン

workers
|> DistributedImageProcessing.distribute(images_stream)
|> Enum.slice(0..5)
|> Enum.map(fn dst_filename ->
  dst_filename
  |> Evision.imread()
  |> Kino.render()
end)

スクリーンショット 2022-11-27 21.39.49.png

それぞれ異なるノード名が描画されました

速度比較

せっかくなので、ノード数による速度比較をしてみましょう

速度比較用の関数を用意します

★メイン

distributed = fn worker_input_list ->
  worker_input_list
  |> Enum.map(fn input ->
    input
    |> Kino.Input.read()
    |> String.to_atom()
  end)
  |> DistributedImageProcessing.distribute(images_stream)
end

Benchee で速度比較を実行します

★メイン

Benchee.run(%{
  "1 worker" => fn -> distributed.([worker_1_input]) end,
  "2 workers" => fn -> distributed.([worker_1_input, worker_2_input]) end,
  "4 workers" => fn -> distributed.([worker_1_input, worker_2_input, worker_3_input, worker_4_input]) end
})

実行結果は以下のようになりました

Name                ips        average  deviation         median         99th %
4 workers          1.78      561.06 ms     ±6.85%      554.79 ms      659.99 ms
2 workers          1.69      591.77 ms     ±7.87%      574.77 ms      694.21 ms
1 worker           1.57      638.71 ms     ±6.90%      632.23 ms      717.61 ms

Comparison: 
4 workers          1.78
2 workers          1.69 - 1.05x slower +30.71 ms
1 worker           1.57 - 1.14x slower +77.65 ms

実態は同じ1台のマシンの中なのでそんなに速くはなりませんが、4ワーカーが最も速くなりました

まとめ

Livebook でも各ノートブックをノードとして接続することで、分散処理が実行できました

次は複数台のマシンで Livebook を起動して分散させてみましょう

16
2
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
2