はじめに
Nx.Serving
を使うと、行列演算の一連処理を簡単に他ノードから実行できます
機械学習ではお馴染みのバッチサイズを指定することで、複数の行列を一度に処理することができます
今回も Livebook 上で実装してみます
実装したノートブックはこちら
セットアップ
Mix.install(
[
{:nx, "~> 0.5"},
{:kino, "~> 0.9"}
]
)
バッチサイズ = 1 (デフォルト)
まず、バッチサイズ = 1 の場合です
特に何も指定しません
今回は単に2倍する行列演算 &Nx.multiply(&1, 2)
を提供します
また、 Kino.start_child
で他ノードからもアクセスできるように子プロセスを開始します
serving_1 =
fn opts -> Nx.Defn.jit(&Nx.multiply(&1, 2), opts) end
|> Nx.Serving.new()
Kino.start_child({Nx.Serving, name: BatchSize1, serving: serving_1})
Nx.Serving
の演算処理を実行する場合、入力は Nx.Batch
を使います
batch = Nx.Batch.stack([Nx.tensor([1])])
実行結果は以下のようになります
Batch
の size
が 1 になっていますね
#Nx.Batch<size: 1, pad: 0, ...>
このバッチを入力として、演算処理を実行します
Nx.Serving.run(serving_1, batch)
実行結果は以下のようになります
#Nx.Tensor<
s64[1][1]
[
[2]
]
>
入力が Nx.Batch.stack([Nx.tensor([1])])
のとき、出力が Nx.tensor([[2]])
になりました
つまり、 Nx.Batch.stack([<バッチ1>])
が Nx.tensor([<バッチ1の演算結果>])
になっているわけです
子プロセスを呼び出す場合も同様です
Nx.Serving.batched_run(BatchSize1, batch)
実行結果は以下のようになります
#Nx.Tensor<
s64[1][1]
[
[2]
]
>
入力のバッチサイズが大きい場合
では、わざとバッチサイズを誤ってみます
batch = Nx.Batch.stack([Nx.tensor([1]), Nx.tensor([2])])
実行結果は以下のようになります
#Nx.Batch<size: 2, pad: 0, ...>
2 つテンソルを入れたのでバッチサイズは 2 になっています
ちなみに、以下のようにした場合も同じ値になります
batch =
Nx.Batch.stack([Nx.tensor([1])])
|> Nx.Batch.stack([Nx.tensor([2])])
では、これをバッチサイズ = 1 の Nx.Serving
に渡してみましょう
Nx.Serving.run(serving_1, batch)
この場合、実行結果は以下のようになります
#Nx.Tensor<
s64[2][1]
[
[2],
[4]
]
>
特に問題なく処理され、実行結果はNx.tensor([<バッチ1の演算結果>, <バッチ2の演算結果>])
になっています
では、子プロセスを呼んでみます
Nx.Serving.batched_run(BatchSize1, batch)
この場合、以下のようなエラーが発生します
** (ArgumentError) batch size (2) cannot exceed Nx.Serving server batch size of 1
(nx 0.5.3) lib/nx/serving.ex:640: Nx.Serving.local_batched_run/3
(nx 0.5.3) lib/nx/serving.ex:618: Nx.Serving.local_batched_run!/3
(stdlib 3.17.2) erl_eval.erl:685: :erl_eval.do_apply/6
(elixir 1.14.2) lib/module/parallel_checker.ex:107: Module.ParallelChecker.verify/1
Nx.Serving
のバッチサイズと入力のバッチサイズが異なるため、エラーになりました
直接 Nx.Serving
を実行する場合はバッチサイズが異なっても問題ありませんが、子プロセスを呼び出す場合はバッチサイズが同じでなければいけません
バッチサイズ = 2
Nx.Serving.process_options(batch_size: 2)
でバッチサイズを 2 に指定します
serving_2 =
fn opts -> Nx.Defn.jit(&Nx.multiply(&1, 2), opts) end
|> Nx.Serving.new()
|> Nx.Serving.process_options(batch_size: 2)
Kino.start_child({Nx.Serving, name: BatchSize2, serving: serving_2})
バッチサイズ 2 の入力を用意します
batch = Nx.Batch.stack([
Nx.tensor([1]),
Nx.tensor([2])
])
実行します
Nx.Serving.run(serving_2, batch)
実行結果は以下のようになりました
#Nx.Tensor<
s64[2][1]
[
[2],
[4]
]
>
子プロセスを呼ぶ場合も同様です
Nx.Serving.batched_run(BatchSize2, batch)
実行結果は以下のようになりました
#Nx.Tensor<
s64[2][1]
[
[2],
[4]
]
>
入力のバッチサイズが小さい場合
バッチサイズ 2 の Nx.Serving
にバッチサイズ 1 の入力を渡してみましょう
Nx.Serving.batched_run(BatchSize2, Nx.Batch.stack([Nx.tensor([1])]))
問題なく実行できました
#Nx.Tensor<
s64[1][1]
[
[2]
]
>
バッチサイズ 2 の Nx.Serving
にバッチサイズ 3 の入力を渡してみましょう
Nx.Serving.batched_run(BatchSize2, Nx.Batch.stack([Nx.tensor([1]), Nx.tensor([2]), Nx.tensor([3])]))
この場合はエラーになります
** (ArgumentError) batch size (3) cannot exceed Nx.Serving server batch size of 2
(nx 0.5.3) lib/nx/serving.ex:640: Nx.Serving.local_batched_run/3
(nx 0.5.3) lib/nx/serving.ex:618: Nx.Serving.local_batched_run!/3
(stdlib 3.17.2) erl_eval.erl:685: :erl_eval.do_apply/6
(elixir 1.14.2) lib/module/parallel_checker.ex:107: Module.ParallelChecker.verify/1
ちなみに、バッチサイズ 3 の Nx.Serving
にバッチサイズ 2 の入力を渡しても問題ありません
まとめ
-
Nx.Serving.process_options(batch_size: <バッチサイズ>)
でバッチサイズが指定できます - 入力と
Nx.Serving
のバッチサイズに関係なく、Nx.Serving.run
では演算処理が実行できる -
Nx.Serving.batched_run
の場合- 入力のバッチサイズが
Nx.Serving
のバッチサイズ以下の場合、演算処理が実行できる - 入力のバッチサイズが
Nx.Serving
のバッチサイズより大きい場合、エラーが発生する
- 入力のバッチサイズが