はじめに
以前書いた記事を組み合わせて、衛星データの並列分散処理を実装します
日本発の衛星データプラットフォーム Tellus から衛星データを取得し、青森県の植生(植物がどれくらい生えているか)を視覚化します
今回も Livebook を使用します
実行環境
MacBook Pro 上のコンテナで実行しています
コンテナ定義はこちら
仮想マシンにはメモリ 12GB 、 CPU 6コアを割り当てています
使用可能な GPU は積んでいません
事前準備
以下の記事に従って、目的とする地域周辺の衛星データをローカルにダウンロードしておきます
提供:だいち(ALOS) AVNIR-2 データ(JAXA)
サーバー側(演算を実行する側)
植生の可視化には NDVI (正規化植生指標)を用います
今回は二つのサーバー側ノードで NDVI の行列演算を実行し、それを一つのクライアント側ノードから呼び出します
実装したノートブックはこちら
サーバー側セットアップ
Livebook を起動し、新しいノートブックのセットアップセルで以下のコードを実行します
Mix.install(
[
{:nx, github: "elixir-nx/nx", branch: "main", sparse: "nx", override: true},
{:evision, "~> 0.1.30"},
{:exla, "~> 0.5"},
{:kino, "~> 0.9"}
],
config: [
nx: [
default_backend: EXLA.Backend
]
]
)
以下の依存モジュールをインストールしています
また、 Nx による行列演算を EXLA を使って高速化する設定をしています
Nx は以下のバグフィックスが必要なため、 GitHub 上の最新版を使います
サーバー側処理定義
NDVI の演算用モジュールを定義します
NDVI の計算式は以下のとおりです
$ NDVI = \frac{NIR - Red}{NIR + Red} $
NIR = 近赤外線、 Red = 赤色光です
この NDVI の値は NIR と Red の値によって以下のように変化します
- NIR=255(最大),Red=0(最小) の場合: 1(最大)
- NIR=0(最小),Red=255(最大) の場合: -1(最小)
従って、 NDVI の各画素は -1 〜 1 の範囲に分布しています
これを画像として表示する場合、 -1 〜 1 を 0 〜 255 に変換する必要があります
defmodule NDVIService do
import Nx.Defn
defn calc(input) do
# 赤色光バンド
red_tensor = input[0][[0..-1//1, 0..-1//1, 0]]
# 近赤外線バンド
nir_tensor = input[0][[0..-1//1, 0..-1//1, 1]]
Nx.select(
# 0 除算をしないため、 NIR と Red の両方が 0 でないところだけ演算する
(red_tensor != 0) * (nir_tensor != 0),
# NDVI の演算
(nir_tensor - red_tensor) / (nir_tensor + red_tensor),
0
)
# -1 ~ 1 の値を 0 ~ 255 に変換する
|> then(fn tensor -> tensor * 128 + 128 end)
|> Nx.as_type(:u8)
|> Nx.new_axis(0)
end
end
これは完全に「映え」のタメですが、このノードでの処理結果を表示するための枠を作っておきます
frame = Kino.Frame.new()
NDVI 演算処理を別ノードから呼び出すための Nx.Serving を作成します
serving =
Nx.Serving.new(fn opts -> Nx.Defn.jit(&NDVIService.calc/1, opts) end, compiler: EXLA)
|> Nx.Serving.client_preprocessing(fn input ->
# 前処理 テンソルをバッチに変換する
batch = Nx.Batch.stack([input])
{batch, :client_info}
end)
|> Nx.Serving.client_postprocessing(fn output, _metadata, _multi? ->
# 後処理 テンソルの次元を一つ落とし、処理結果を枠に表示する
res = output[0]
res
|> Evision.Mat.from_nx_2d()
|> Evision.resize({320, 320})
# 見やすいように色を付ける
|> Evision.applyColorMap(Evision.Constant.cv_COLORMAP_WINTER())
|> then(&Kino.Frame.render(frame, &1))
res
end)
|> Nx.Serving.distributed_postprocessing(fn output ->
# 分散時(別ノードからの実行時)のための後処理
# バイナリバックエンドに変換する
Nx.backend_copy(output, Nx.BinaryBackend)
end)
Nx.Serving を別ノードから呼ばれる子プロセスとして起動します
Kino.start_child({Nx.Serving, name: NDVIServer, serving: serving})
サーバー側接続情報取得
クライアント側から接続するためのノード名、クッキーを表示します
{node(), Node.get_cookie()}
実行結果は以下のようになります
{:"ifixyaky-livebook_server@2b27d4f2e917", :trW9eiqVNurqLbMKdkAJ4Gs4d8L0XAgh}
サーバー側はもう一つノートブックを開き、同じコードを実行しておきます
クライアント側
衛星データを読み込んでサーバー側ノードに投げ、 NDVI 算出結果を受け取ります
本来は大きなまま処理するのですが、今回は「映え」のため、小さくしてから処理します
実装したノートブックはこちら
クライアント側セットアップ
Mix.install(
[
{:nx, github: "elixir-nx/nx", branch: "main", sparse: "nx", override: true},
{:evision, "~> 0.1.30"},
{:exla, "~> 0.5"},
{:kino, "~> 0.9"},
{:flow, "~> 1.2"}
],
config: [
nx: [
default_backend: EXLA.Backend
]
]
)
サーバー側の依存モジュールに加え、並列処理のための Flow を追加しています
クライアント側衛星データ処理定義
衛星データ( Tellus からダウンロードした AVNIR-2 データ)のヘッダー情報を読み込むためのモジュールを定義します
今回は NDVI を算出したいだけなので、それに必要なものだけ読み込んでいます
defmodule BandInfo do
defstruct gain: 0.0, offset: 0.0
end
defmodule HeaderInfo do
defstruct red: %BandInfo{},
nir: %BandInfo{},
date: nil
def get_string(info, start, value) do
info
|> String.slice(start, value)
|> String.trim()
end
def get_value(info, start, value) do
info
|> get_string(start, value)
|> String.to_float()
end
def read(hdr_file_path) do
info = File.read!(hdr_file_path)
%HeaderInfo{
# 赤色光バンド
red: %BandInfo{
gain: get_value(info, 1752, 8),
offset: get_value(info, 1760, 8)
},
# 近赤外線バンド
nir: %BandInfo{
gain: get_value(info, 1768, 8),
offset: get_value(info, 1776, 8)
},
date: get_string(info, 192, 8)
}
end
end
衛星データの読込、サーバー側との連携処理をするモジュール定義です
defmodule NDVIClient do
import Nx.Defn
def read_header(file_path_list) do
file_path_list
|> Enum.find(fn file -> Path.extname(file) == ".txt" end)
|> HeaderInfo.read()
end
def get_band_tensor(file_path_list, prefix) do
file_path_list
|> Enum.find(fn file ->
file
|> Path.basename()
|> String.starts_with?(prefix)
end)
|> Evision.imread(flags: Evision.Constant.cv_IMREAD_GRAYSCALE())
# 今回は処理を速くするため小さくする
|> Evision.resize({640, 640})
|> Evision.Mat.to_nx(EXLA.Backend)
end
defn get_luminance(tensor, gain, offset) do
tensor * gain + offset
end
def calc(file_path_list) do
# ヘッダー情報
header_info = read_header(file_path_list)
# 赤色光バンド画像をテンソルとして取得
red_tensor =
file_path_list
|> get_band_tensor("IMG-03")
|> get_luminance(header_info.red.gain, header_info.red.offset)
# 近赤外線バンド画像をテンソルとして取得
nir_tensor =
file_path_list
|> get_band_tensor("IMG-04")
|> get_luminance(header_info.nir.gain, header_info.nir.offset)
# 赤色光バンドと近赤外線バンドをまとめてサーバー側に投げ、結果を取得する
ndvi_img =
[red_tensor, nir_tensor]
|> Nx.stack(axis: -1)
|> then(fn input ->
Nx.Serving.batched_run(NDVIServer, input, &Nx.backend_copy/1)
end)
|> Evision.Mat.from_nx_2d()
{header_info.date, ndvi_img}
end
end
サーバー側との接続
サーバー側の接続情報を入力するためのテキスト入力を作成します
server_node_inputs =
["A", "B"]
|> Enum.into(%{}, fn node_id ->
{
node_id,
%{
node: Kino.Input.text("SERVER_#{node_id}_NODE_NAME"),
cookie: Kino.Input.text("SERVER_#{node_id}_COOKIE")
}
}
end)
server_node_inputs
|> Enum.map(fn {_, inputs} ->
[inputs.node, inputs.cookie]
end)
|> List.flatten()
|> Kino.Layout.grid(columns: 2)
表示されたテキスト入力に各サーバー側のノード名、クッキーを入力します
入力された情報を使ってサーバーに接続します
server_node_inputs
|> Enum.map(fn {_, inputs} ->
node_name =
inputs.node
|> Kino.Input.read()
|> String.to_atom()
cookie =
inputs.cookie
|> Kino.Input.read()
|> String.to_atom()
Node.set_cookie(node_name, cookie)
Node.connect(node_name)
end)
結果が [true, true]
になれば接続できています(失敗したノードは結果が false
になります)
並列分散処理実行
処理対象とする衛星データのシーン ID を指定します
今回の記事では /tmp/<シーン ID>/
配下に各シーンの衛星データ(ヘッダー情報、各バンドの画像)が保存されていることを前提にしています
scene_id_list = [
"202ce08d-ba4b-4ffe-8165-109fd3a8b917",
"34d8dc6f-fdd1-4542-a038-c1235a5a97fa",
"12ad308b-6ce1-40ec-9ebf-f0215c30500e",
"e2e85b2e-a208-4a65-87fd-b92721b037a8",
"208a3618-7700-421b-bf05-fd59551cc1aa",
"d5ce7320-5b25-4ced-bda5-0e25a9d75940",
"9d14706f-cee7-4eb4-9151-2558609c3de0",
"3f4555ac-eaf3-4066-a1ba-20bb1ec1c0b3"
]
シーン ID を Flow.map
で並列実行します
stages=2
で 2 並列に指定しています
ndvi_list =
scene_id_list
|> Flow.from_enumerable(stages: 2, max_demand: 1)
|> Flow.map(fn scene_id ->
"/tmp/#{scene_id}"
|> File.ls!()
|> Enum.map(fn filename -> Path.join(["/tmp", scene_id, filename]) end)
|> NDVIClient.calc()
end)
|> Enum.to_list()
実行すると、サーバー側ノードで各シーンの NDVI が計算され、結果が表示されていきます
最後に、クライアント側で受け取った NDVI の計算結果をタブ表示します
ndvi_list
|> Enum.map(fn {date, ndvi_tensor} ->
img =
ndvi_tensor
|> Evision.applyColorMap(Evision.Constant.cv_COLORMAP_WINTER())
{date, img}
end)
|> Kino.Layout.tabs()
まとめ
Nx.Serving を使うことで、衛星データの並列分散処理が実装できました
3 つのノートブックが連動して更新される様子は、まさに Elixir 、 Livebook ならではの醍醐味ではないでしょうか
今回は環境がなかったので同一端末内で実行しましたが、もちろん別端末でも分散実行可能です(そうしないと本来分散の意味がない)
ただし、分散するための前処理、後処理、そしてノード間通信があるため、画像データが大きいとオーバーヘッドが大きくなり、結果として遅くなります
並列分散処理を有効に活用するためには、もっと重たい処理をたくさんの端末で分散しないと効果がありません
近いうちに機械学習モデルによる推論を複数端末で分散実行してみたいと思います
おまけ
衛星データの並列処理
単純に一つのノードで同様の演算を実行します
実装したノートブックはこちら
セットアップ(分散処理のクライアント側と同じ)
Mix.install(
[
{:nx, github: "elixir-nx/nx", branch: "main", sparse: "nx", override: true},
{:evision, "~> 0.1.30"},
{:exla, "~> 0.5"},
{:kino, "~> 0.9"},
{:flow, "~> 1.2"}
],
config: [
nx: [
default_backend: EXLA.Backend
]
]
)
ヘッダー情報(分散処理のクライアント側と同じ)
defmodule BandInfo do
defstruct gain: 0.0, offset: 0.0
end
defmodule HeaderInfo do
defstruct red: %BandInfo{},
nir: %BandInfo{},
date: nil
def get_string(info, start, value) do
info
|> String.slice(start, value)
|> String.trim()
end
def get_value(info, start, value) do
info
|> get_string(start, value)
|> String.to_float()
end
def read(hdr_file_path) do
info = File.read!(hdr_file_path)
%HeaderInfo{
# 赤色光バンド
red: %BandInfo{
gain: get_value(info, 1752, 8),
offset: get_value(info, 1760, 8)
},
# 近赤外線バンド
nir: %BandInfo{
gain: get_value(info, 1768, 8),
offset: get_value(info, 1776, 8)
},
date: get_string(info, 192, 8)
}
end
end
処理済画像の表示枠
frame = Kino.Frame.new()
NDVI 演算処理モジュール
defmodule NDVIClient do
import Nx.Defn
def read_header(file_path_list) do
file_path_list
|> Enum.find(fn file -> Path.extname(file) == ".txt" end)
|> HeaderInfo.read()
end
def get_band_tensor(file_path_list, prefix) do
file_path_list
|> Enum.find(fn file ->
file
|> Path.basename()
|> String.starts_with?(prefix)
end)
|> Evision.imread(flags: Evision.Constant.cv_IMREAD_GRAYSCALE())
# 今回は処理を速くするため小さくする
|> Evision.resize({640, 640})
|> Evision.Mat.to_nx(EXLA.Backend)
end
defn get_luminance(tensor, gain, offset) do
tensor * gain + offset
end
defn calc_ndvi(red_tensor, nir_tensor) do
ndvi_tensor =
Nx.select(
# 0 除算をしないため、 NIR と Red の両方が 0 でないところだけ演算する
(red_tensor != 0) * (nir_tensor != 0),
# NDVI の演算
(nir_tensor - red_tensor) / (nir_tensor + red_tensor),
0
)
|> then(fn tensor -> tensor * 128 + 128 end)
|> Nx.as_type(:u8)
ndvi_tensor
end
def calc(file_path_list, frame) do
header_info = read_header(file_path_list)
red_tensor =
file_path_list
|> get_band_tensor("IMG-03")
|> get_luminance(header_info.red.gain, header_info.red.offset)
nir_tensor =
file_path_list
|> get_band_tensor("IMG-04")
|> get_luminance(header_info.nir.gain, header_info.nir.offset)
ndvi_img =
calc_ndvi(red_tensor, nir_tensor)
|> Evision.Mat.from_nx_2d()
ndvi_img
|> Evision.resize({320, 320})
|> Evision.applyColorMap(Evision.Constant.cv_COLORMAP_WINTER())
|> then(&Kino.Frame.render(frame, &1))
{header_info.date, ndvi_img}
end
end
シーン一覧
scene_id_list = [
"202ce08d-ba4b-4ffe-8165-109fd3a8b917",
"34d8dc6f-fdd1-4542-a038-c1235a5a97fa",
"12ad308b-6ce1-40ec-9ebf-f0215c30500e",
"e2e85b2e-a208-4a65-87fd-b92721b037a8",
"208a3618-7700-421b-bf05-fd59551cc1aa",
"d5ce7320-5b25-4ced-bda5-0e25a9d75940",
"9d14706f-cee7-4eb4-9151-2558609c3de0",
"3f4555ac-eaf3-4066-a1ba-20bb1ec1c0b3"
]
並列処理の実行
ndvi_list =
scene_id_list
|> Flow.from_enumerable(stages: 2, max_demand: 1)
|> Flow.map(fn scene_id ->
"/tmp/#{scene_id}"
|> File.ls!()
|> Enum.map(fn filename -> Path.join(["/tmp", scene_id, filename]) end)
|> NDVIClient.calc(frame)
end)
|> Enum.to_list()
結果のタブ表示
ndvi_list
|> Enum.map(fn {date, ndvi_tensor} ->
img =
ndvi_tensor
|> Evision.applyColorMap(Evision.Constant.cv_COLORMAP_WINTER())
{date, img}
end)
|> Kino.Layout.tabs()
速度比較
それぞれ、 NDVI 演算処理のセル実行時間を比較(単位は秒)
-
読込時の画像サイズ 640 * 640 の場合
処理形態 2並列 4並列 並列処理 15.5 49.0 並列分散処理 17.8 27.5 -
読込時にリサイズしない( 8,000 * 7,244 )の場合
処理形態 2並列 4並列 並列処理 69.0 70.1 並列分散処理 75.5 171.9
実行環境のメモリや CPU 、 GPU によって大きく変わってくると思いますが、画像が大きいとき、おそらく前処理、後処理のオーバーヘッドがかなり大きくなっていそうです