13
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

はじめに

以前書いた記事を組み合わせて、衛星データの並列分散処理を実装します

日本発の衛星データプラットフォーム Tellus から衛星データを取得し、青森県の植生(植物がどれくらい生えているか)を視覚化します

今回も Livebook を使用します

実行環境

MacBook Pro 上のコンテナで実行しています

コンテナ定義はこちら

仮想マシンにはメモリ 12GB 、 CPU 6コアを割り当てています

使用可能な GPU は積んでいません

事前準備

以下の記事に従って、目的とする地域周辺の衛星データをローカルにダウンロードしておきます

提供:だいち(ALOS) AVNIR-2 データ(JAXA)

サーバー側(演算を実行する側)

植生の可視化には NDVI (正規化植生指標)を用います

今回は二つのサーバー側ノードで NDVI の行列演算を実行し、それを一つのクライアント側ノードから呼び出します

実装したノートブックはこちら

サーバー側セットアップ

Livebook を起動し、新しいノートブックのセットアップセルで以下のコードを実行します

Mix.install(
  [
    {:nx, github: "elixir-nx/nx", branch: "main", sparse: "nx", override: true},
    {:evision, "~> 0.1.30"},
    {:exla, "~> 0.5"},
    {:kino, "~> 0.9"}
  ],
  config: [
    nx: [
      default_backend: EXLA.Backend
    ]
  ]
)

以下の依存モジュールをインストールしています

  • Nx: 行列演算
  • Evision: 画像処理
  • EXLA: 行列演算の高速化
  • Kino: Livebook の UI/UX

また、 Nx による行列演算を EXLA を使って高速化する設定をしています

Nx は以下のバグフィックスが必要なため、 GitHub 上の最新版を使います

サーバー側処理定義

NDVI の演算用モジュールを定義します

NDVI の計算式は以下のとおりです

$ NDVI = \frac{NIR - Red}{NIR + Red} $

NIR = 近赤外線、 Red = 赤色光です

この NDVI の値は NIR と Red の値によって以下のように変化します

  • NIR=255(最大),Red=0(最小) の場合: 1(最大)
  • NIR=0(最小),Red=255(最大) の場合: -1(最小)

従って、 NDVI の各画素は -1 〜 1 の範囲に分布しています

これを画像として表示する場合、 -1 〜 1 を 0 〜 255 に変換する必要があります

defmodule NDVIService do
  import Nx.Defn

  defn calc(input) do
    # 赤色光バンド
    red_tensor = input[0][[0..-1//1, 0..-1//1, 0]]
    # 近赤外線バンド
    nir_tensor = input[0][[0..-1//1, 0..-1//1, 1]]

    Nx.select(
      # 0 除算をしないため、 NIR と Red の両方が 0 でないところだけ演算する
      (red_tensor != 0) * (nir_tensor != 0),
      # NDVI の演算
      (nir_tensor - red_tensor) / (nir_tensor + red_tensor),
      0
    )
    # -1 ~ 1 の値を 0 ~ 255 に変換する 
    |> then(fn tensor -> tensor * 128 + 128 end)
    |> Nx.as_type(:u8)
    |> Nx.new_axis(0)
  end
end

これは完全に「映え」のタメですが、このノードでの処理結果を表示するための枠を作っておきます

frame = Kino.Frame.new()

NDVI 演算処理を別ノードから呼び出すための Nx.Serving を作成します

serving =
  Nx.Serving.new(fn opts -> Nx.Defn.jit(&NDVIService.calc/1, opts) end, compiler: EXLA)
  |> Nx.Serving.client_preprocessing(fn input ->
    # 前処理 テンソルをバッチに変換する
    batch = Nx.Batch.stack([input])
    {batch, :client_info}
  end)
  |> Nx.Serving.client_postprocessing(fn output, _metadata, _multi? ->
    # 後処理 テンソルの次元を一つ落とし、処理結果を枠に表示する
    res = output[0]

    res
    |> Evision.Mat.from_nx_2d()
    |> Evision.resize({320, 320})
    # 見やすいように色を付ける
    |> Evision.applyColorMap(Evision.Constant.cv_COLORMAP_WINTER())
    |> then(&Kino.Frame.render(frame, &1))

    res
  end)
  |> Nx.Serving.distributed_postprocessing(fn output ->
    # 分散時(別ノードからの実行時)のための後処理
    # バイナリバックエンドに変換する
    Nx.backend_copy(output, Nx.BinaryBackend)
  end)

Nx.Serving を別ノードから呼ばれる子プロセスとして起動します

Kino.start_child({Nx.Serving, name: NDVIServer, serving: serving})

サーバー側接続情報取得

クライアント側から接続するためのノード名、クッキーを表示します

{node(), Node.get_cookie()}

実行結果は以下のようになります

{:"ifixyaky-livebook_server@2b27d4f2e917", :trW9eiqVNurqLbMKdkAJ4Gs4d8L0XAgh}

サーバー側はもう一つノートブックを開き、同じコードを実行しておきます

クライアント側

衛星データを読み込んでサーバー側ノードに投げ、 NDVI 算出結果を受け取ります

本来は大きなまま処理するのですが、今回は「映え」のため、小さくしてから処理します

実装したノートブックはこちら

クライアント側セットアップ

Mix.install(
  [
    {:nx, github: "elixir-nx/nx", branch: "main", sparse: "nx", override: true},
    {:evision, "~> 0.1.30"},
    {:exla, "~> 0.5"},
    {:kino, "~> 0.9"},
    {:flow, "~> 1.2"}
  ],
  config: [
    nx: [
      default_backend: EXLA.Backend
    ]
  ]
)

サーバー側の依存モジュールに加え、並列処理のための Flow を追加しています

クライアント側衛星データ処理定義

衛星データ( Tellus からダウンロードした AVNIR-2 データ)のヘッダー情報を読み込むためのモジュールを定義します

今回は NDVI を算出したいだけなので、それに必要なものだけ読み込んでいます

defmodule BandInfo do
  defstruct gain: 0.0, offset: 0.0
end

defmodule HeaderInfo do
  defstruct red: %BandInfo{},
            nir: %BandInfo{},
            date: nil

  def get_string(info, start, value) do
    info
    |> String.slice(start, value)
    |> String.trim()
  end

  def get_value(info, start, value) do
    info
    |> get_string(start, value)
    |> String.to_float()
  end

  def read(hdr_file_path) do
    info = File.read!(hdr_file_path)

    %HeaderInfo{
      # 赤色光バンド
      red: %BandInfo{
        gain: get_value(info, 1752, 8),
        offset: get_value(info, 1760, 8)
      },
      # 近赤外線バンド
      nir: %BandInfo{
        gain: get_value(info, 1768, 8),
        offset: get_value(info, 1776, 8)
      },
      date: get_string(info, 192, 8)
    }
  end
end

衛星データの読込、サーバー側との連携処理をするモジュール定義です

defmodule NDVIClient do
  import Nx.Defn

  def read_header(file_path_list) do
    file_path_list
    |> Enum.find(fn file -> Path.extname(file) == ".txt" end)
    |> HeaderInfo.read()
  end

  def get_band_tensor(file_path_list, prefix) do
    file_path_list
    |> Enum.find(fn file ->
      file
      |> Path.basename()
      |> String.starts_with?(prefix)
    end)
    |> Evision.imread(flags: Evision.Constant.cv_IMREAD_GRAYSCALE())
    # 今回は処理を速くするため小さくする
    |> Evision.resize({640, 640})
    |> Evision.Mat.to_nx(EXLA.Backend)
  end

  defn get_luminance(tensor, gain, offset) do
    tensor * gain + offset
  end

  def calc(file_path_list) do
    # ヘッダー情報
    header_info = read_header(file_path_list)

    # 赤色光バンド画像をテンソルとして取得
    red_tensor =
      file_path_list
      |> get_band_tensor("IMG-03")
      |> get_luminance(header_info.red.gain, header_info.red.offset)

    # 近赤外線バンド画像をテンソルとして取得
    nir_tensor =
      file_path_list
      |> get_band_tensor("IMG-04")
      |> get_luminance(header_info.nir.gain, header_info.nir.offset)

    # 赤色光バンドと近赤外線バンドをまとめてサーバー側に投げ、結果を取得する
    ndvi_img =
      [red_tensor, nir_tensor]
      |> Nx.stack(axis: -1)
      |> then(fn input ->
        Nx.Serving.batched_run(NDVIServer, input, &Nx.backend_copy/1)
      end)
      |> Evision.Mat.from_nx_2d()

    {header_info.date, ndvi_img}
  end
end

サーバー側との接続

サーバー側の接続情報を入力するためのテキスト入力を作成します

server_node_inputs =
  ["A", "B"]
  |> Enum.into(%{}, fn node_id ->
    {
      node_id,
      %{
        node: Kino.Input.text("SERVER_#{node_id}_NODE_NAME"),
        cookie: Kino.Input.text("SERVER_#{node_id}_COOKIE")
      }
    }
  end)

server_node_inputs
|> Enum.map(fn {_, inputs} ->
  [inputs.node, inputs.cookie]
end)
|> List.flatten()
|> Kino.Layout.grid(columns: 2)

表示されたテキスト入力に各サーバー側のノード名、クッキーを入力します

接続情報入力

入力された情報を使ってサーバーに接続します

server_node_inputs
|> Enum.map(fn {_, inputs} ->
  node_name =
    inputs.node
    |> Kino.Input.read()
    |> String.to_atom()

  cookie =
    inputs.cookie
    |> Kino.Input.read()
    |> String.to_atom()

  Node.set_cookie(node_name, cookie)

  Node.connect(node_name)
end)

結果が [true, true] になれば接続できています(失敗したノードは結果が false になります)

並列分散処理実行

処理対象とする衛星データのシーン ID を指定します

今回の記事では /tmp/<シーン ID>/ 配下に各シーンの衛星データ(ヘッダー情報、各バンドの画像)が保存されていることを前提にしています

scene_id_list = [
  "202ce08d-ba4b-4ffe-8165-109fd3a8b917",
  "34d8dc6f-fdd1-4542-a038-c1235a5a97fa",
  "12ad308b-6ce1-40ec-9ebf-f0215c30500e",
  "e2e85b2e-a208-4a65-87fd-b92721b037a8",
  "208a3618-7700-421b-bf05-fd59551cc1aa",
  "d5ce7320-5b25-4ced-bda5-0e25a9d75940",
  "9d14706f-cee7-4eb4-9151-2558609c3de0",
  "3f4555ac-eaf3-4066-a1ba-20bb1ec1c0b3"
]

シーン ID を Flow.map で並列実行します

stages=2 で 2 並列に指定しています

ndvi_list =
  scene_id_list
  |> Flow.from_enumerable(stages: 2, max_demand: 1)
  |> Flow.map(fn scene_id ->
    "/tmp/#{scene_id}"
    |> File.ls!()
    |> Enum.map(fn filename -> Path.join(["/tmp", scene_id, filename]) end)
    |> NDVIClient.calc()
  end)
  |> Enum.to_list()

実行すると、サーバー側ノードで各シーンの NDVI が計算され、結果が表示されていきます

ndvi_distributed.gif

最後に、クライアント側で受け取った NDVI の計算結果をタブ表示します

ndvi_list
|> Enum.map(fn {date, ndvi_tensor} ->
  img =
    ndvi_tensor
    |> Evision.applyColorMap(Evision.Constant.cv_COLORMAP_WINTER())

  {date, img}
end)
|> Kino.Layout.tabs()

ndvi_tabs.gif

まとめ

Nx.Serving を使うことで、衛星データの並列分散処理が実装できました

3 つのノートブックが連動して更新される様子は、まさに Elixir 、 Livebook ならではの醍醐味ではないでしょうか

今回は環境がなかったので同一端末内で実行しましたが、もちろん別端末でも分散実行可能です(そうしないと本来分散の意味がない)

ただし、分散するための前処理、後処理、そしてノード間通信があるため、画像データが大きいとオーバーヘッドが大きくなり、結果として遅くなります

並列分散処理を有効に活用するためには、もっと重たい処理をたくさんの端末で分散しないと効果がありません

近いうちに機械学習モデルによる推論を複数端末で分散実行してみたいと思います

おまけ

衛星データの並列処理

単純に一つのノードで同様の演算を実行します

実装したノートブックはこちら

セットアップ(分散処理のクライアント側と同じ)

Mix.install(
  [
    {:nx, github: "elixir-nx/nx", branch: "main", sparse: "nx", override: true},
    {:evision, "~> 0.1.30"},
    {:exla, "~> 0.5"},
    {:kino, "~> 0.9"},
    {:flow, "~> 1.2"}
  ],
  config: [
    nx: [
      default_backend: EXLA.Backend
    ]
  ]
)

ヘッダー情報(分散処理のクライアント側と同じ)

defmodule BandInfo do
  defstruct gain: 0.0, offset: 0.0
end

defmodule HeaderInfo do
  defstruct red: %BandInfo{},
            nir: %BandInfo{},
            date: nil

  def get_string(info, start, value) do
    info
    |> String.slice(start, value)
    |> String.trim()
  end

  def get_value(info, start, value) do
    info
    |> get_string(start, value)
    |> String.to_float()
  end

  def read(hdr_file_path) do
    info = File.read!(hdr_file_path)

    %HeaderInfo{
      # 赤色光バンド
      red: %BandInfo{
        gain: get_value(info, 1752, 8),
        offset: get_value(info, 1760, 8)
      },
      # 近赤外線バンド
      nir: %BandInfo{
        gain: get_value(info, 1768, 8),
        offset: get_value(info, 1776, 8)
      },
      date: get_string(info, 192, 8)
    }
  end
end

処理済画像の表示枠

frame = Kino.Frame.new()

NDVI 演算処理モジュール

defmodule NDVIClient do
  import Nx.Defn

  def read_header(file_path_list) do
    file_path_list
    |> Enum.find(fn file -> Path.extname(file) == ".txt" end)
    |> HeaderInfo.read()
  end

  def get_band_tensor(file_path_list, prefix) do
    file_path_list
    |> Enum.find(fn file ->
      file
      |> Path.basename()
      |> String.starts_with?(prefix)
    end)
    |> Evision.imread(flags: Evision.Constant.cv_IMREAD_GRAYSCALE())
    # 今回は処理を速くするため小さくする
    |> Evision.resize({640, 640})
    |> Evision.Mat.to_nx(EXLA.Backend)
  end

  defn get_luminance(tensor, gain, offset) do
    tensor * gain + offset
  end

  defn calc_ndvi(red_tensor, nir_tensor) do
    ndvi_tensor =
      Nx.select(
        # 0 除算をしないため、 NIR と Red の両方が 0 でないところだけ演算する
        (red_tensor != 0) * (nir_tensor != 0),
        # NDVI の演算
        (nir_tensor - red_tensor) / (nir_tensor + red_tensor),
        0
      )
      |> then(fn tensor -> tensor * 128 + 128 end)
      |> Nx.as_type(:u8)

    ndvi_tensor
  end

  def calc(file_path_list, frame) do
    header_info = read_header(file_path_list)

    red_tensor =
      file_path_list
      |> get_band_tensor("IMG-03")
      |> get_luminance(header_info.red.gain, header_info.red.offset)

    nir_tensor =
      file_path_list
      |> get_band_tensor("IMG-04")
      |> get_luminance(header_info.nir.gain, header_info.nir.offset)

    ndvi_img =
      calc_ndvi(red_tensor, nir_tensor)
      |> Evision.Mat.from_nx_2d()

    ndvi_img
    |> Evision.resize({320, 320})
    |> Evision.applyColorMap(Evision.Constant.cv_COLORMAP_WINTER())
    |> then(&Kino.Frame.render(frame, &1))

    {header_info.date, ndvi_img}
  end
end

シーン一覧

scene_id_list = [
  "202ce08d-ba4b-4ffe-8165-109fd3a8b917",
  "34d8dc6f-fdd1-4542-a038-c1235a5a97fa",
  "12ad308b-6ce1-40ec-9ebf-f0215c30500e",
  "e2e85b2e-a208-4a65-87fd-b92721b037a8",
  "208a3618-7700-421b-bf05-fd59551cc1aa",
  "d5ce7320-5b25-4ced-bda5-0e25a9d75940",
  "9d14706f-cee7-4eb4-9151-2558609c3de0",
  "3f4555ac-eaf3-4066-a1ba-20bb1ec1c0b3"
]

並列処理の実行

ndvi_list =
  scene_id_list
  |> Flow.from_enumerable(stages: 2, max_demand: 1)
  |> Flow.map(fn scene_id ->
    "/tmp/#{scene_id}"
    |> File.ls!()
    |> Enum.map(fn filename -> Path.join(["/tmp", scene_id, filename]) end)
    |> NDVIClient.calc(frame)
  end)
  |> Enum.to_list()

結果のタブ表示

ndvi_list
|> Enum.map(fn {date, ndvi_tensor} ->
  img =
    ndvi_tensor
    |> Evision.applyColorMap(Evision.Constant.cv_COLORMAP_WINTER())

  {date, img}
end)
|> Kino.Layout.tabs()

速度比較

それぞれ、 NDVI 演算処理のセル実行時間を比較(単位は秒)

  • 読込時の画像サイズ 640 * 640 の場合

    処理形態 2並列 4並列
    並列処理 15.5 49.0
    並列分散処理 17.8 27.5
  • 読込時にリサイズしない( 8,000 * 7,244 )の場合

    処理形態 2並列 4並列
    並列処理 69.0 70.1
    並列分散処理 75.5 171.9

実行環境のメモリや CPU 、 GPU によって大きく変わってくると思いますが、画像が大きいとき、おそらく前処理、後処理のオーバーヘッドがかなり大きくなっていそうです

13
2
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?