SPRESENSEで Tensorflow lite/micro の顔認識サンプルを動かしてみました。しかし、せっかくなら自分で用意したデータセットで画像認識を行いたいものです。そこで、今回は独自のデータセット(じゃんけん)で学習済モデルを生成して、SPRESENSEカメラで認識させることをしたいと思います。
自前のデータセットの制作
学習用のデータセットは、SPRESENSEで取得した画像をモノクロ化し縮小したものを使います。データは背景ノイズ(Ohters)、グー(Rock)、チョキ(Scissors)、パー(Paper)の四種類です。
- ”背景ノイズ”のデータセットの一部抜粋
- ”グー”のデータセットの一部抜粋
- ”チョキ”のデータセットの一部抜粋
- ”パー”のデータセットの一部抜粋
これらのデータを、それぞれ画像へのパスとラベルに対応させたCSVテキストに一覧にします。
./Others/PICT1000.png,0
./Others/PICT1001.png,0
./Others/PICT1002.png,0
./Others/PICT1003.png,0
./Others/PICT1004.png,0
./Others/PICT1005.png,0
....
./Paper/PICT1747.png,1
./Paper/PICT1748.png,1
./Paper/PICT1749.png,1
./Paper/PICT1750.png,1
./Paper/PICT1751.png,1
....
./Rock/PICT000.png,2
./Rock/PICT001.png,2
./Rock/PICT003.png,2
./Rock/PICT004.png,2
./Rock/PICT005.png,2
.....
./Scissors/PICT1697.png,3
./Scissors/PICT1698.png,3
./Scissors/PICT1699.png,3
./Scissors/PICT1700.png,3
./Scissors/PICT1701.png,3
....
これを評価用のデータセットも同じように作成します。
学習用データセット一覧データ:rsp_training.txt
評価用データセット一覧データ:rsp_valdation.txt
(データセットなどはそのうちGithub上に公開します)
Tensorflow で学習済モデルを出力する
データセットが出来たので、次は Tensorflow でニューラルネットワークを設計し学習をさせます。Jupyter Notebook で開発したので処理ブロック毎に解説をしていきます。
最初はライブラリのインポート部です。メッセージを少し抑制する処理もいれています。
import sys
import tensorflow as tf
from tensorflow import keras
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
# To silent verbose
tf.autograph.set_verbosity(0)
import logging
logging.getLogger("tensorflow").setLevel(logging.ERROR)
次にデータセットの読み込みを行います。ここは少しやっかいなコードになってしまいました。シャッフルしたいだけなんですが、わざわざデータセット配列に画像とラベル入れてあげて、シャッフルし終わったら、numpy の配列に戻す、という無駄な処理になってしまいました。もう少しエレガントにやれると思うのですが、取り急ぎの解決策ということで。
t_df = pd.read_csv('./dataset/training/rps_training.txt', header=None)
t_images_path = t_df.iloc[:,0]
t_labels = t_df.iloc[:,1]
v_df = pd.read_csv('./dataset/validation/rps_validation.txt', header=None)
v_images_path = v_df.iloc[:,0]
v_labels =v_df.iloc[:,1]
# read labels
t_labels = tf.convert_to_tensor(t_labels)
t_labels = tf.keras.utils.to_categorical(t_labels, 4)
v_labels = tf.convert_to_tensor(v_labels)
v_labels = tf.keras.utils.to_categorical(v_labels, 4)
# read image paths
for i in range(len(t_images_path)):
t_images_path[i] = './dataset/training' + t_images_path[i][1:]
t_images_path = [str(path) for path in t_images_path]
for i in range(len(v_images_path)):
v_images_path[i] = './dataset/validation' + v_images_path[i][1:]
v_images_path = [str(path) for path in v_images_path]
t_img_path_ds = tf.data.Dataset.from_tensor_slices(t_images_path)
v_img_path_ds = tf.data.Dataset.from_tensor_slices(v_images_path)
# define the function to normalize images from 0-255 to 0-1.0
def load_and_preprocess_from_path(path):
image = tf.io.read_file(path)
image = tf.image.decode_image(image, channels=1,expand_animations=False)
image = tf.image.resize(image, [28, 28])
image /= 255.0 # normalize to [0,1] range
return image
# load image objects
# t_images_ds: image dataset for training
# v_images_ds: image dataset for validation
# This process requires a normalization process for the images
AUTOTUNE = tf.data.experimental.AUTOTUNE
t_images_ds = t_img_path_ds.map(load_and_preprocess_from_path, num_parallel_calls=AUTOTUNE)
t_images_ds.element_spec
t_images_ds.cardinality()
v_images_ds = v_img_path_ds.map(load_and_preprocess_from_path, num_parallel_calls=AUTOTUNE)
v_images_ds.element_spec
v_images_ds.cardinality()
# to put labels to dataset
t_labels_ds = tf.data.Dataset.from_tensor_slices(t_labels)
v_labels_ds = tf.data.Dataset.from_tensor_slices(v_labels)
# combine datasets of images and labels
t_image_label_ds = tf.data.Dataset.zip((t_images_ds, t_labels_ds))
v_image_label_ds = tf.data.Dataset.zip((v_images_ds, v_labels_ds))
# shuffle the datasets
t_ds = t_image_label_ds.shuffle(buffer_size=len(t_image_label_ds))
v_ds = v_image_label_ds.shuffle(buffer_size=len(v_image_label_ds))
# convert datasets to numpy arrays
t_np_images = np.empty((0,28,28), dtype=float)
t_np_labels = np.empty((0,4), dtype=int)
for img, lbl in t_ds.take(len(t_ds)):
img = img.numpy().reshape(1,28,28)
lbl = lbl.numpy().reshape(1,4)
t_np_images = np.append(t_np_images, img, axis=0)
t_np_labels = np.append(t_np_labels, lbl, axis=0)
v_np_images = np.empty((0,28,28), dtype=float)
v_np_labels = np.empty((0,4), dtype=int)
for img, lbl in v_ds.take(len(v_ds)):
img = img.numpy().reshape(1,28,28)
lbl = lbl.numpy().reshape(1,4)
v_np_images = np.append(v_np_images, img, axis=0)
v_np_labels = np.append(v_np_labels, lbl, axis=0)
ニューラルネットワークを定義します。シンプルな畳み込みニューラルネットワークを使いました。
model = keras.Sequential([
keras.layers.InputLayer(input_shape=(28, 28)),
keras.layers.Reshape(target_shape=(28, 28, 1)),
keras.layers.Conv2D(
filters=6, kernel_size=(5, 5), padding='same', activation=tf.nn.relu, name="conv2d_6"),
keras.layers.MaxPooling2D(pool_size=(2, 2), padding='same'),
keras.layers.Flatten(),
keras.layers.Dense(32, activation=tf.nn.relu, name="dense_32"),
keras.layers.Dense(4),
keras.layers.Activation(tf.nn.softmax)
])
#model.compile(optimizer='adam', loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
model.summary()
先ほど生成した numpy に変換したデータセットで学習を行います。
batch_size = 32
epochs = 100
model.fit(x=t_np_images, y=t_np_labels, batch_size=batch_size, epochs=epochs, verbose=1, validation_split=0.1)
_, test_accuracy = model.evaluate(x=v_np_images, y=v_np_labels, verbose=1)
print('test accuracy = %f' % test_accuracy)
学習済モデルを "modeul.tflite" というファイルに一旦出力します。
# Convert the model.
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
# Show model size in KBs.
tflite_model_size = len(tflite_model) / 1024
print('Original model size = %dKBs.' % tflite_model_size)
# Save the model to disk
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
サイズが 151kB と少し大きいので最適化して小さくします。出力した”model.tflite”をもう一度読み込み最適化処理を加えます。
interpreter = tf.lite.Interpreter('model.tflite')
interpreter.allocate_tensors()
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset_gen
tflite_model = converter.convert()
# Show model size in KBs.
tflite_model_size = len(tflite_model) / 1024
print('Quantized model size = %dKBs.' % tflite_model_size)
# Save the model to disk
open('qmodel.tflite', "wb").write(tflite_model)
サイズが 42kB と小さくなりました。生成した学習済モデル「qmodel.tflite」の中身をテキストに変換し、ヘッダーファイル "qmodel.h" として出力します。これをスケッチからインクルードし活用します。
import binascii
def convert_to_c_array(bytes) -> str:
hexstr = binascii.hexlify(bytes).decode("UTF-8")
hexstr = hexstr.upper()
array = ["0x" + hexstr[i:i + 2] for i in range(0, len(hexstr), 2)]
array = [array[i:i+10] for i in range(0, len(array), 10)]
return ",\n ".join([", ".join(e) for e in array])
tflite_binary = open('qmodel.tflite', 'rb').read()
ascii_bytes = convert_to_c_array(tflite_binary)
header_file = "const unsigned char model_tflite[] = {\n " + ascii_bytes + "\n};\nunsigned int model_tflite_len = " + str(len(tflite_binary)) + ";"
# print(c_file)
with open("qmodel.h", "w") as f:
f.write(header_file)
SPRESENSEに学習済モデルを組込む
SPRESENSEのカメラから画像を取得して認識するまでのコードを次に示します。先ほど生成した”qmodel.h”をスケッチと同じフォルダーにコピーして使ってください。
このコードはTensorflow lite/micro をセットアップしカメラのストリーミング機能を動作させます。カメラで撮影した画像は 320x240 の YUV422 の画像なので、データセットに合うように 28x28 のモノクロ画像に変換し、生成した学習済モデルで推論させます。
コンパイルの際には、メインコアのメモリ割り当てを 1152kB にしてください。
#include <Camera.h>
#include "Adafruit_GFX.h"
#include "Adafruit_ILI9341.h"
#include "tensorflow/lite/micro/all_ops_resolver.h"
#include "tensorflow/lite/micro/micro_error_reporter.h"
#include "tensorflow/lite/micro/micro_interpreter.h"
#include "tensorflow/lite/micro/system_setup.h"
#include "tensorflow/lite/schema/schema_generated.h"
#include "qmodel.h"
tflite::ErrorReporter* error_reporter = nullptr;
const tflite::Model* model = nullptr;
tflite::MicroInterpreter* interpreter = nullptr;
TfLiteTensor* input = nullptr;
TfLiteTensor* output = nullptr;
int inference_count = 0;
constexpr int kTensorArenaSize = 100000;
uint8_t tensor_arena[kTensorArenaSize];
#define DNN_IMG_W 28
#define DNN_IMG_H 28
#define CAM_IMG_W 320
#define CAM_IMG_H 240
#define CAM_CLIP_X 48
#define CAM_CLIP_Y 8
#define CAM_CLIP_W 224
#define CAM_CLIP_H 224
#define TFT_RST 8
#define TFT_DC 9
#define TFT_CS 10
Adafruit_ILI9341 tft = Adafruit_ILI9341(TFT_CS ,TFT_DC ,TFT_RST);
uint16_t disp[target_w*target_h];
void disp_image(uint16_t* buf, int w, int h) {
for (int n = 0; n < w*h; ++n) {
uint16_t value = buf[n];
uint16_t y_h = (value & 0xf000) >> 8;
uint16_t y_l = (value & 0x00f0) >> 4;
value = (y_h | y_l);
uint16_t value6 = (value >> 2);
uint16_t value5 = (value >> 3);
disp[n] = (value5 << 11) | (value6 << 5) | value5;
}
tft.drawRGBBitmap(0, 0, disp, w, h);
}
void CamCB(CamImage img) {
static uint32_t last_mills = 0;
if (!img.isAvailable()) {
Serial.println("img is not available");
return;
}
int sx = CAM_CLIP_X;
int sy = CAM_CLIP_Y;
int ex = CAM_CLIP_X + CAM_CLIP_W -1;
int ey = CAM_CLIP_Y + CAM_CLIP_H -1;
CamImage small;
CamErr err = img.clipAndResizeImageByHW(small, sx, sy, ex, ey, DNN_IMG_W, DNN_IMG_H);
if (!small.isAvailable()){
Serial.println("Clip and Resize CamImage failed (CamErr) : " + String(err));
return false;
}
uint16_t* buf = (uint16_t*)small.getImgBuff();
for (int i = 0; i < DNN_IMG_W*DNN_IMG_H; ++i) {
uint16_t value = buf[i];
uint16_t y_h = (value & 0xf000) >> 8;
uint16_t y_l = (value & 0x00f0) >> 4;
value = (y_h | y_l);
input->data.f[i] = (float)(value)/255.0;
}
TfLiteStatus invoke_status = interpreter->Invoke();
if (invoke_status != kTfLiteOk) {
Serial.println("Invoke failed");
return;
}
for (int n = 0; n < 4; ++n) {
float value = output->data.f[n];
Serial.println("score[" + String(n) +"] " + String(value));
}
disp_image(buf, DNN_IMG_W, DNN_IMG_H);
}
void setup() {
Serial.begin(115200);
tft.begin();
tft.setRotation(3);
tflite::InitializeTarget();
memset(tensor_arena, 0, kTensorArenaSize*sizeof(uint8_t));
// Set up logging.
static tflite::MicroErrorReporter micro_error_reporter;
error_reporter = µ_error_reporter;
// Map the model into a usable data structure..
model = tflite::GetModel(model_tflite);
if (model->version() != TFLITE_SCHEMA_VERSION) {
Serial.println("Model provided is schema version "
+ String(model->version()) + " not equal "
+ "to supported version "
+ String(TFLITE_SCHEMA_VERSION));
return;
} else {
Serial.println("Model version: " + String(model->version()));
}
// This pulls in all the operation implementations we need.
static tflite::AllOpsResolver resolver;
// Build an interpreter to run the model with.
static tflite::MicroInterpreter static_interpreter(
model, resolver, tensor_arena, kTensorArenaSize, error_reporter);
interpreter = &static_interpreter;
// Allocate memory from the tensor_arena for the model's tensors.
TfLiteStatus allocate_status = interpreter->AllocateTensors();
if (allocate_status != kTfLiteOk) {
Serial.println("AllocateTensors() failed");
return;
} else {
Serial.println("AllocateTensor() Success");
}
size_t used_size = interpreter->arena_used_bytes();
Serial.println("Area used bytes: " + String(used_size));
input = interpreter->input(0);
output = interpreter->output(0);
Serial.println("Model input:");
Serial.println("dims->size: " + String(input->dims->size));
for (int n = 0; n < input->dims->size; ++n) {
Serial.println("dims->data[" + String(n) + "]: " + String(input->dims->data[n]));
}
Serial.println("Model output:");
Serial.println("dims->size: " + String(output->dims->size));
for (int n = 0; n < output->dims->size; ++n) {
Serial.println("dims->data[" + String(n) + "]: " + String(output->dims->data[n]));
}
Serial.println("Completed tensorflow setup");
theCamera.begin();
CamErr err = theCamera.startStreaming(true, CamCB);
if (err != CAM_ERR_SUCCESS) {
Serial.println("start streaming err: " + String(err));
return;
}
}
void loop() {
}
作ってみた感想
久しぶりに Tensorflow を使ったのですが、結構メモリを食うなというのが第一印象です。しかも、データセットを自前で作成するのが、かなり面倒くさい。正直、エッジAIで扱えるニューラルネットワークなら Neural Network Console もしくは Neural Network Libraries のほうがメモリ効率はよいですし、データセットの取り扱いも楽でよいなと思いました。(しかも国産ですし)
しかし、Tensorflow はいろいろなプラットフォームに対応していますし、豊富な資産を使えるのも大きな魅力です。SPRESENSEはせっかく二つのソリューションが使えるので場面場面でうまく使い分けていきたいなと思います。