SPRESENSEのSDKバージョンが2.5.0になって Tensorflow 2.8.0 に対応したので、プルーニング(枝刈り)を試してみました。SPRESENSSE でTensorflow を使うには専用のArduino Package をインストールする必要があります。次のサイトからインストールしてください。
プルーニング(枝刈り)とは
プルーニングとは、ニューラルネットワークのつながりの中で関連が薄い(重み付け係数が小さい)ものを刈り取って計算量を減らす処理のことです。重み付けがゼロになるので認識速度の向上が期待できます。
ここではTensorflow でプルーニング処理をして、SPRESENSEで動かして効果を確認してみたいと思います。
例題の畳み込みニューラルネットワーク
例題のニューラルネットワークは Jupyter notebook を使う人が多いと思いますので、処理毎に分割をしています。
ライブラリのインポート
Tensorflow をインポートします。バージョンを"2.8.0"にしたら WARNING がたくさん出てきて煩いので、set_verbosity(0) とsetLevel(logging.ERROR) で 抑制しています。
import tensorflow as tf
from tensorflow import keras
import logging
# To silent verbose
tf.autograph.set_verbosity(0)
logging.getLogger("tensorflow").setLevel(logging.ERROR)
MNISTのダウンロードと正規化
データセットにMNISTを用います。出力は CategoricalCrossEntropy を使用したので、10 出力に変更しています。例えば、ラベルが ['3'] の場合は、出力のフォーマットを ['0','0','0','1','0','0','0','0','0','0'] に変更します。
# Load MNIST dataset.
mnist = keras.datasets.mnist
(train_images, y_train_labels), (test_images, y_test_labels) = mnist.load_data()
# Normalize the input image so that each pixel value is between 0 and 1.
train_images = train_images / 255.0
test_images = test_images / 255.0
train_labels = tf.keras.utils.to_categorical(y_train_labels, 10)
test_labels = tf.keras.utils.to_categorical(y_test_labels, 10)
MODEL の定義と学習の実行
モデルは次のように定義しました。一般的な畳み込みニューラルネットワークです。ここでは、学習の実行まで行っています。
# Model definition
model = keras.Sequential([
keras.layers.InputLayer(input_shape=(28, 28)),
keras.layers.Reshape(target_shape=(28, 28, 1)),
keras.layers.Conv2D(
filters=6, kernel_size=(5, 5), padding='same', activation=tf.nn.relu, name="conv2d_6"),
keras.layers.MaxPooling2D(pool_size=(2, 2), padding='same'),
keras.layers.Flatten(),
keras.layers.Dense(32, activation=tf.nn.relu, name="dense_32"),
keras.layers.Dense(10),
keras.layers.Activation(tf.nn.softmax)
])
model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
model.summary()
# Training the model
model.fit(x=train_images, y=train_labels, batch_size=128, epochs=30, verbose=1, validation_split=0.1)
# Output accuracy
_, test_accuracy = model.evaluate(x=test_images, y=test_labels, verbose=1)
print('test accuracy = %f' % test_accuracy)
学習したモデルを Tensorflow Lite 形式に変換
学習したモデルを Tensorflow Lite 形式に変換して、サイズを確認しディスクに一旦保存をします。
# Convert the model to a tflite model.
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
# Show the tflite model size in KBs.
tflite_model_size = len(tflite_model) / 1024
print('Original model size = %dKBs.' % tflite_model_size)
# Save the model to disk
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
学習済モデルをCスタイルのヘッダーに出力
Tensorflow Lite 形式の学習済モデルをCスタイルのヘッダーファイルに出力します。このヘッダーファイルはSPRESENSEのスケッチと同じところにおいてください。
# Output the tflite model to C-style header
import binascii
def convert_to_c_array(bytes) -> str:
hexstr = binascii.hexlify(bytes).decode("UTF-8")
hexstr = hexstr.upper()
array = ["0x" + hexstr[i:i + 2] for i in range(0, len(hexstr), 2)]
array = [array[i:i+10] for i in range(0, len(array), 10)]
return ",\n ".join([", ".join(e) for e in array])
tflite_binary = open('model.tflite', 'rb').read()
ascii_bytes = convert_to_c_array(tflite_binary)
header_file = "const unsigned char model_tflite[] = {\n " + ascii_bytes + "\n};\nunsigned int model_tflite_len = " + str(len(tflite_binary)) + ";"
# print(c_file)
with open("model.h", "w") as f:
f.write(header_file)
SPRESENSE用Arduinoスケッチ
プルーニングの説明の前に出力した学習済モデルをSPRESENSEで動かすためのArduinoスケッチについて説明します。このスケッチは、後述する量子化モデル、プルーニング済の量子化モデル共通で使用することができます。
SPRESENSE用BMPライブラリのインストール
このスケッチでは、画像認識のために画像ライブラリを使います。次のサイトからSPRESENSE用のBMPライブラリをダウンロードし解凍し、Arduino/libraries
フォルダに入れれば利用できます。
テスト用画像をSPRESENSEのフラッシュに書き込む
テスト用の画像は次のものを使いました。ダウンロードしてペイントなどでBMPに変換して使ってください。変換の際は256色で保存をしてください。
これを xmodem_writer
で本体に書き込みを行います。xmodem_writerは、それぞれのOS向けに準備されています。
Windows用*
https://github.com/sonydevworld/spresense/blob/master/sdk/tools/windows/xmodem_writer.exe
Linux用
https://github.com/sonydevworld/spresense/blob/master/sdk/tools/linux/xmodem_writer
macOS用
https://github.com/sonydevworld/spresense/blob/master/sdk/tools/macos/xmodem_writer
次のコマンドでSpresense本体のフラッシュにデータを書き込めます。
$ xmodem_writer -c COM3 0009.bmp
Tensorflow lite/micro を動かすSPRESENSE用のスケッチ
SPRESENSEで動かすためのスケッチを示します。Pythonで出力したmodel.h
をスケッチと同じフォルダに入れます。また、後で量子化モデル、プルーニング量子化モデルと比較するためそれぞれのヘッダーも記述しています。動作確認の時にコメントアウトを変更して測定します。
Tensorflow用のメモリを確保するkTensorArenaSize
はモデル毎に適切な値を設定する必要があります。最初は大きめに設定しておきましょう。使用しているメモリ量は interpreter->arena_used_bytes()
で確認できます。
#include "tensorflow/lite/micro/all_ops_resolver.h"
#include "tensorflow/lite/micro/micro_error_reporter.h"
#include "tensorflow/lite/micro/micro_interpreter.h"
#include "tensorflow/lite/micro/system_setup.h"
#include "tensorflow/lite/schema/schema_generated.h"
//#include "pmodel.h" /* pruned model */
//#include "qmodel.h" /* quantized model */
#include "model.h" /* float model */
#define TEST_FILE "0003.bmp"
tflite::ErrorReporter* error_reporter = nullptr;
const tflite::Model* model = nullptr;
tflite::MicroInterpreter* interpreter = nullptr;
TfLiteTensor* input = nullptr;
TfLiteTensor* output = nullptr;
int inference_count = 0;
constexpr int kTensorArenaSize = 30000;
uint8_t tensor_arena[kTensorArenaSize];
#include <Flash.h>
#include <BmpImage.h>
BmpImage bmp;
void setup() {
Serial.begin(115200);
tflite::InitializeTarget();
memset(tensor_arena, 0, kTensorArenaSize*sizeof(uint8_t));
// Set up logging.
static tflite::MicroErrorReporter micro_error_reporter;
error_reporter = µ_error_reporter;
// Map the model into a usable data structure..
model = tflite::GetModel(model_tflite);
if (model->version() != TFLITE_SCHEMA_VERSION) {
Serial.println("Model provided is schema version "
+ String(model->version()) + " not equal "
+ "to supported version "
+ String(TFLITE_SCHEMA_VERSION));
return;
} else {
Serial.println("Model version: " + String(model->version()));
}
// This pulls in all the operation implementations we need.
static tflite::AllOpsResolver resolver;
// Build an interpreter to run the model with.
static tflite::MicroInterpreter static_interpreter(
model, resolver, tensor_arena, kTensorArenaSize, error_reporter);
interpreter = &static_interpreter;
// Allocate memory from the tensor_arena for the model's tensors.
TfLiteStatus allocate_status = interpreter->AllocateTensors();
if (allocate_status != kTfLiteOk) {
Serial.println("AllocateTensors() failed");
return;
} else {
Serial.println("AllocateTensor() Success");
}
size_t used_size = interpreter->arena_used_bytes();
Serial.println("Area used bytes: " + String(used_size));
input = interpreter->input(0);
output = interpreter->output(0);
/* check input */
if (input->type != kTfLiteFloat32) {
Serial.println("input type mismatch. expected input type is float32");
return;
} else {
Serial.println("input type is float32");
}
Serial.println("Model input:");
Serial.println("input->type: " + String(input->type));
Serial.println("dims->size: " + String(input->dims->size));
for (int n = 0; n < input->dims->size; ++n) {
Serial.println("dims->data[n]: " + String(input->dims->data[n]));
}
Serial.println("Model output:");
Serial.println("dims->size: " + String(output->dims->size));
for (int n = 0; n < output->dims->size; ++n) {
Serial.println("dims->data[n]: " + String(output->dims->data[n]));
}
/* read test data */
File myFile = Flash.open(TEST_FILE);
if (!myFile) { Serial.println(TEST_FILE " not found"); return; }
Serial.println("Read " TEST_FILE);
bmp.begin(myFile);
BmpImage::BMP_IMAGE_PIX_FMT fmt = bmp.getPixFormat();
if (fmt != BmpImage::BMP_IMAGE_GRAY8) {
Serial.println("support format error");
return;
}
int width = bmp.getWidth();
int height = bmp.getHeight();
Serial.println("width: " + String(width));
Serial.println("height: " + String(height));
uint8_t* img = bmp.getImgBuff();
for (int i = 0; i < width*height; ++i) {
input->data.f[i] = (float)(img[i]/255.0);
}
Serial.println("Do inference");
uint32_t start_time = micros();
TfLiteStatus invoke_status = interpreter->Invoke();
if (invoke_status != kTfLiteOk) {
Serial.println("Invoke failed");
return;
}
uint32_t duration = micros() - start_time;
Serial.println("Inference time = " + String(duration));
for (int n = 0; n < 10; ++n) {
float value = output->data.f[n];
Serial.println("[" + String(n) + "] " + String(value));
}
}
void loop() { }
Kerasで量子化モデルを生成する
プルーニングを行う前に比較用に量子化モデルを出力するようにします。TFLiteConverter の部分を次のように変更します。
# Convert Keras model to TF Lite format.
converter = tf.lite.TFLiteConverter.from_keras_model(model)
def representative_dataset_gen():
for i in range(100):
input_image = tf.cast(test_images[i], tf.float32)
input_image = tf.reshape(input_image, [1,28,28])
yield ([input_image])
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset_gen
tflite_model = converter.convert()
# Show model size in KBs.
tflite_model_size = len(tflite_model) / 1024
print('Quantized model size = %dKBs.' % tflite_model_size)
# Save the model to disk
open('qmodel.tflite', "wb").write(tflite_model)
プルーニングされた学習済モデルを生成する
いよいよ、学習済モデルにプルーニングを行いそこからプルーニングされた学習モデルを生成します。
プルーニング用パラメータの設定。
プルーニングは学習済モデルから、tensorflow_model_optimaization を用いてプルーニング処理用のモデルを生成します。ここではプルーニング用のパラメータを設定しています。最初は50%のプルーニングから初めて最大80%までをターゲットに設定しました。
import tensorflow_model_optimization as tfmot
import tempfile
prune_low_magnitude = tfmot.sparsity.keras.prune_low_magnitude
# Compute end step to finish pruning after 30 epochs.
end_epoch = 30
num_iterations_per_epoch = len(train_images)
end_step = num_iterations_per_epoch * end_epoch
# Define parameters for pruning.
pruning_params = {
'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(initial_sparsity=0.5, final_sparsity=0.80, begin_step=0, end_step=end_step),
}
# Try to apply pruning wrapper with pruning policy parameter.
model_for_pruning = prune_low_magnitude(model, **pruning_params)
プルーニング用モデルで再学習を行う
プルーニング用モデルを使って再学習を行います。この過程で関連の薄いネットワークの重みをゼロに置き換えていきます。
model_for_pruning.compile(loss="categorical_crossentropy", optimizer='adam', metrics=['accuracy'])
model_for_pruning.fit(
x=train_images,
y=train_labels,
epochs=end_epoch,
validation_split=0.1,
callbacks=tfmot.sparsity.keras.UpdatePruningStep()
)
# Evaluate the model for pruning.
pruned_test_loss, pruned_test_acc = model_for_pruning.evaluate(test_images, test_labels, verbose=0)
print('Pruned model accuracy:', pruned_test_acc)
プルーニング用のラッパーを削除する
プルーニング直後には参照用のオリジナルの重み付け係数とプルーニング後の重み付け係数が含まれているため、サイズが倍になってしまっています。次の処理で参照用の重み付け係数を削除します。
# Remove the pruning wrapper so that it is not included in the model
model = tfmot.sparsity.keras.strip_pruning(model)
プルーニング済の学習済モデルを出力する
ここまでできたら、プルーニング済の学習済モデルを Tensorflow Lite 形式に出力します。量子化モデルを出力することもできます。
プルーニングの効果を確認する
プルーニングの効果を確認するために最後のDense(32)レイヤーの重みデータを出力してみました。左がプルーニング前のDenseレイヤーの一部の重み付け係数の値を示しています。右がプルーニング後の状態です。これを見ても分かるように、かなりの部分が重み付け係数がゼロになっているのが分かると思います。図は正方形になっていますが、あまり意味はありませんので念のため。
プルーニングの効果を測定する
ここではプルーニングの効果を検証します。サイズ、認識率、処理速度 の観点で比較をしてみました。
サイズの比較
Tensorflow Lite 形式の量子化モデルとプルーニング後の量子化モデルを比較してみました。これを見てもわかりますが、バイナリ形式のサイズは変わりません。重み付けをゼロに置き換えているだけだからです。
量子化モデル | プルーニング後 | |
---|---|---|
バイナリ形式 | 42 KBs | 42 KBs |
圧縮後 | 33 KBs | 26 KBs |
圧縮をすると22%ほど削減することができました。プルーニングは学習済モデルを伝送路などで送信するときに威力が発揮できます。Arduinoでもバイナリをテキストに変換する処理を加えればプルーニングの恩恵に預かれそうです。
認識率の比較
認識率の観点でオリジナル、量子化モデル、プルーニング後(量子化モデル)を比較してみました。量子化モデルとプルーニング後の認識率にほぼ変わりはないので、プルーニングによる認識率低下はなさそうです。
オリジナル | 量子化モデル | プルーニング後 | |
---|---|---|---|
認識率 | 0.985100 | 0.984100 | 0.984900 |
処理速度の比較
オリジナルの処理速度と量子化モデル、プルーニング後の処理速度を比較しました。SPRESENSEの場合、若干の処理速度の向上がありました。プルーニングの処理を増やせば処理速度を向上させることができるかも知れませんが、計算能力が高いSpresenseでは、その効果は限定的です。もっと計算能力の低いマイコンだと効果があるかも知れません。
オリジナル | 量子化モデル | プルーニング後 | |
---|---|---|---|
処理速度 | 70128 μSec | 32043 μSec | 31951 μSec |
使ってみた感想
正直なところ、処理速度の面でプルーニングの効果があるかなと思いましたが、それほど大きな効果はありませんでした。プルーニングによる効果はバイナリの学習済モデルの圧縮による効果がもっとも大きいという結果となりました。
学習済モデルをネットワークで送信するようなクラウドxエッジAIシステムでは効果がありそうですが、実験的なシステムの場合はあまり使う場面はなさそうです。こういう手段もあるということを頭に入れておくくらいで良さそうですね。苦労した割には、得られるものが少なかったなぁ…