0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

自動運転AIチャレンジ2025に参加する中でAI教材の活用が推奨されていたので早速チャレンジ

しようと思ったのだが、これとは別のアプローチでやりたいことは実現できるんじゃないか?ということでこの記事を執筆

前提

(私の理解が間違ってなければ)この教材の中ではAWSIMの中でGeminiを使ったVLMを活用することで軌跡データを出力しよう、というアプローチなのだが、あえてここではVLMではなく畳み込みニューラルネットワークを使うアプローチを試してみた。というのも自動運転AIチャレンジを進めていく中で学習データとして使えると思わしきデータは割と溜まっており、LLM至上主義に対抗する意味でもちょっとチャレンジしてみようかな、という意図で開発を進めた。
なお、このニューラルネットワークが動画データから学習する項目は以下の5つ

・走行時のX座標
・走行時のY座標
・走行時のZ座標
・走行時のz_quat(カートの角度)
・走行時のw_quat(カートの角度)

x_quat, y_quatに関してはどうやら0になっていることが前提のようだったのでこちらの学習ではオミットすることとした。
こちらの学習した結果を元にAIがCSVファイルに書き込み、軌跡データを出力することを目指す。
なお、記事の後半で記述されるコードにはカートのスピードも学習済みという前提でCSVファイルに記載する方法が書かれているが、こちらの学習についてはデータ収集のやり方が違っているので以下の別記事で記載。

実装1 (収集されたデータのnpyファイルへの変換)

事前に収集、仕分けされたデータを以下のコードでNN(ニューラルネットワーク)の学習のための学習データ、ラベルデータとなるnpyファイルに変換する。データの収集・仕分けの方法は以下記事で記載。

こちらと同じ手順をY座標、Z座標、z_quat、w_quatの学習データに対しても同様に行う。
なお、こちらの変換の際のフォルダ構成は以下

入力するデータのフォルダ構成(X座標のデータの場合)

データを格納したフォルダ
 └X座標データを格納したフォルダ
  └座標別フォルダ(e.g. 84556)
  |-85667
  |-85668
  |-....

出力するデータのフォルダ構成(X座標以下共通)

v2t
 └1_data
  └new
   |-train_data.npy
   └train_label.npy

npyファイルの変換コード

# -*- coding: utf-8 -*-
import os
from pathlib import Path
import json
import numpy as np
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical  # np_utils 互換
from tensorflow.keras.utils import load_img, img_to_array, array_to_img

# ========= 設定 =========
ROOT_DIR = Path(r"/path/to/data/x")   # ← Xフォルダ(この直下に複数フォルダがある前提)
OUT_DIR  = Path(r"/path/to/1_data/new")  # npy等の出力先
OUT_DIR.mkdir(parents=True, exist_ok=True)

IMG_SIZE = (64, 64) #イメージサイズの設定
COLOR_MODE = "rgb"      # "rgb" か "grayscale"
DTYPE = "float32"             # 出力配列のdtype
FILE_PATTERNS = ("*.jpeg", "*.jpg", "*.png")  # 読み込む拡張子

# 水増し設定(0なら水増しなし)
AUG_PER_IMAGE = 0
datagen = ImageDataGenerator(
    rotation_range=0,
    width_shift_range=0.0,
    height_shift_range=0.0,
    zoom_range=0.1,
    # horizontal_flip=True,
    # vertical_flip=True,
)

# ========= ここから処理 =========
def iter_image_paths(folder: Path):
    for pat in FILE_PATTERNS:
        for p in folder.glob(pat):
            if p.is_file():
                yield p

def load_image_as_array(path: Path):
    # Keras load_img: color_mode="grayscale" | "rgb"
    img = load_img(str(path), color_mode=COLOR_MODE, target_size=IMG_SIZE)
    arr = img_to_array(img)  # HxWx1 or HxWx3
    return arr

def main():
    X_list = []
    y_list = []
    class_index = {}   # {"folder_name": class_id}

    # サブフォルダをクラスとして走査
    subfolders = sorted([p for p in ROOT_DIR.iterdir() if p.is_dir()])

    if not subfolders:
        raise RuntimeError(f"No subfolders found under: {ROOT_DIR}")

    for cid, cls_dir in enumerate(subfolders):
        class_index[cls_dir.name] = cid
        print("cid: " + str(cid))
        imgs = list(iter_image_paths(cls_dir))
        if not imgs:
            print(f"[WARN] no images in: {cls_dir}")
            continue

        print(f"[{cls_dir.name}] files={len(imgs)}  -> class_id={cid}")

        for img_path in imgs:
            try:
                arr = load_image_as_array(img_path)  # (H, W, C)
            except Exception as e:
                print(f"  [SKIP] {img_path} -> {e}")
                continue

            X_list.append(arr)
            y_list.append(cid)

            # ---- 水増し(メモリ内)----
            if AUG_PER_IMAGE > 0:
                x = np.expand_dims(arr, axis=0)  # (1, H, W, C)
                g = datagen.flow(x, batch_size=1, shuffle=False)
                for _ in range(AUG_PER_IMAGE):
                    aug = next(g)[0]  # (H, W, C)
                    X_list.append(aug)
                    y_list.append(cid)

    if not X_list:
        raise RuntimeError("No images collected. Check ROOT_DIR / patterns / structure.")

    X = np.stack(X_list).astype(DTYPE)   # (N, H, W, C)
    y = np.array(y_list, dtype=np.int64)
    y = to_categorical(y, num_classes=len(class_index))

    # 正規化など必要ならここで
    # X /= 255.0

    # 保存
    np.save(OUT_DIR / "train_data_x.npy", X) #画像からX座標を読み取るための学習データを作成
    np.save(OUT_DIR / "train_label_x.npy", y) #画像からX座標を読み取るためのラベルデータを作成
    with open(OUT_DIR / "class_index.json", "w", encoding="utf-8") as f:
        json.dump(class_index, f, ensure_ascii=False, indent=2)

    print(f"Saved: X.npy shape={X.shape}, dtype={X.dtype}")
    print(f"       y.npy shape={y.shape}, num_classes={len(class_index)}")
    print(f"       class_index -> {OUT_DIR / 'class_index_z.json'}") #ラベルデータをjsonファイルとして保存(後ほどこちらも使うので注意)

if __name__ == "__main__":
    main()

実装2 (変換されたnpyファイルを使って機械学習)

学習データを交差検証法を使ってテストデータに分割し、6層の畳み込みニューラルネットワーク(Convolution Neural Netowrk)を使って学習を進め、学習した結果であるpickleファイルを出力。こちらと同じ手順をY座標、Z座標、z_quat、w_quatの学習データに対しても同様に行い、合計5つのpickelファイルを出力する。
以下の順序でコードを回すことで学習は完了する。

データの読み込み(以下はX座標データの例)

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import pickle
import sys, os

train_data = np.load("/path/to/1_data/new/train_data_x.npy", allow_pickle=True)
train_data = np.transpose(train_data, (0, 3, 1, 2)) #モデルに読み込ませるためにデータを整形
train_label = np.load("/path/to/1_data/new/train_label_z_quat.npy", allow_pickle=True)
print(train_data.shape)
print(train_label.shape)

読み込んだデータの正規化

こちらを行うことで後述するモデルでの学習が円滑に進むようになる

train_data =  (train_data - train_data.min()) / train_data.max()
train_data = (train_data)/255
train_data = train_data.astype('float32')
print(train_data)

TrainとTestにデータを分割する

こちらのコード内でのAccuracy(AIの精度)の検証のために必要。
なお、TrainとTestの分割は7:3が一般的とのことなのでそのように分割している。

X_train, X_test, y_train, y_test = train_test_split(train_data, train_label, test_size=0.3, random_state=1234, shuffle=True)
print(X_train.shape, X_test.shape)

from sklearn.preprocessing import LabelBinarizer
lb = LabelBinarizer()


train = X_train
test = X_test
train_labels = y_train
test_labels = y_test

CNNの実装

6層のNN(ディープニューラルネットワーク)に畳み込み層を織り交ぜて実装。畳み込み層で画像の特徴量をW(重み)とb(バイアス)として抽出し次の層に持ち越している。特徴量だけを伝播させると学習がうまく行かなくなることも多いので極端な特徴量を均して学習を円滑化させるBatch Normalizationを畳み込み層の直後に挟み、その後にReLUを置くことで計算の簡易化とそれに伴い起こりうる特徴量の消失の防止を両立している。最後はaffineレイヤにて「この画像はどの分類に属する画像か?」を判別するための特徴ベクトルにデータが圧縮されるが、その圧縮を過不足なく行うためにDropoutという仕組みを入れてaffineレイヤから出力されるデータを調整している。なお、common.layersはこのネットの“部品”=各レイヤークラス(順伝播 forward と逆伝播 backward を実装したモジュール群)を提供している。CNN を実際に動かす心臓部(forward/backward の中身)とも言える。具体的な内容に関しては以下参照。

また、SoftmaxWithLossはaffineレイヤから出力されたデータを確率分布に変換し、損失の計算(正解クラスの確率を高く予測できているか)を行うという役割、それに加えて損失をどのように小さくすればいいかという情報を前の層に与える役割をもっている。

class DeepConvNet:
    def __init__(self, input_dim=(3, 370, 203),  # ★ (C=3, H=370, W=203)
                 conv_param_1 = {'filter_num':18, 'filter_size':3, 'pad':1, 'stride':1},
                 conv_param_2 = {'filter_num':18, 'filter_size':3, 'pad':1, 'stride':1},
                 conv_param_3 = {'filter_num':36, 'filter_size':3, 'pad':1, 'stride':1},
                 conv_param_4 = {'filter_num':36, 'filter_size':3, 'pad':2, 'stride':1},
                 conv_param_5 = {'filter_num':72, 'filter_size':3, 'pad':1, 'stride':1},
                 conv_param_6 = {'filter_num':72, 'filter_size':3, 'pad':1, 'stride':1},
                 hidden_size=50, output_size=10):

        C, H, W = input_dim  # (3, 370, 203)

        # --- 出力サイズを順番に計算 ---
        def conv_out(size, f, s, p):
            return int(1 + (size + 2*p - f) / s)

        # conv1
        H = conv_out(H, conv_param_1['filter_size'], conv_param_1['stride'], conv_param_1['pad'])
        W = conv_out(W, conv_param_1['filter_size'], conv_param_1['stride'], conv_param_1['pad'])
        # conv2 -> pool(2,2)
        H = conv_out(H, conv_param_2['filter_size'], conv_param_2['stride'], conv_param_2['pad']) // 2
        W = conv_out(W, conv_param_2['filter_size'], conv_param_2['stride'], conv_param_2['pad']) // 2
        # conv3
        H = conv_out(H, conv_param_3['filter_size'], conv_param_3['stride'], conv_param_3['pad'])
        W = conv_out(W, conv_param_3['filter_size'], conv_param_3['stride'], conv_param_3['pad'])
        # conv4 -> pool(2,2)
        H = conv_out(H, conv_param_4['filter_size'], conv_param_4['stride'], conv_param_4['pad']) // 2
        W = conv_out(W, conv_param_4['filter_size'], conv_param_4['stride'], conv_param_4['pad']) // 2
        # conv5
        H = conv_out(H, conv_param_5['filter_size'], conv_param_5['stride'], conv_param_5['pad'])
        W = conv_out(W, conv_param_5['filter_size'], conv_param_5['stride'], conv_param_5['pad'])
        # conv6 -> pool(2,2)
        H = conv_out(H, conv_param_6['filter_size'], conv_param_6['stride'], conv_param_6['pad']) // 2
        W = conv_out(W, conv_param_6['filter_size'], conv_param_6['stride'], conv_param_6['pad']) // 2

        final_c = conv_param_6['filter_num']
        final_size = H * W * final_c
        print("Final feature map:", (final_c, H, W), "->", final_size)

        # He初期化
        import numpy as np
        pre_node_nums = np.array([
            C*3*3, 18*3*3, 18*3*3, 36*3*3, 36*3*3, 36*3*3,
            final_size, hidden_size
        ])
        weight_init_scales = np.sqrt(2.0 / pre_node_nums)

        self.params = {}
        pre_ch = C
        for idx, cp in enumerate([conv_param_1, conv_param_2, conv_param_3,
                                  conv_param_4, conv_param_5, conv_param_6]):
            FN, FH = cp['filter_num'], cp['filter_size']
            self.params[f'W{idx+1}'] = weight_init_scales[idx] * \
                                       np.random.randn(FN, pre_ch, FH, FH)
            self.params[f'b{idx+1}'] = np.zeros(FN)
            pre_ch = FN
            # add BN parameters for this conv layer
            self.params[f'gamma{idx+1}'] = np.ones(FN)
            self.params[f'beta{idx+1}']  = np.zeros(FN)

        self.params['W7'] = weight_init_scales[6] * np.random.randn(final_size, hidden_size)
        self.params['b7'] = np.zeros(hidden_size)
        self.params['W8'] = weight_init_scales[7] * np.random.randn(hidden_size, output_size)
        self.params['b8'] = np.zeros(output_size)

        # --- レイヤ構築(元コードと同じ)---
        self.layers = []
        self.layers.append(Convolution(self.params['W1'], self.params['b1'],
                                       conv_param_1['stride'], conv_param_1['pad']))
        self.layers.append(BatchNormalization(self.params['gamma1'], self.params['beta1']))
        self.layers.append(ReLU())
        self.layers.append(Convolution(self.params['W2'], self.params['b2'],
                                       conv_param_2['stride'], conv_param_2['pad']))
        self.layers.append(BatchNormalization(self.params['gamma1'], self.params['beta1']))
        self.layers.append(ReLU())
        self.layers.append(MaxPooling(pool_h=2, pool_w=2, stride=2))

        self.layers.append(Convolution(self.params['W3'], self.params['b3'],
                                       conv_param_3['stride'], conv_param_3['pad']))
        self.layers.append(BatchNormalization(self.params['gamma1'], self.params['beta1']))
        self.layers.append(ReLU())
        self.layers.append(Convolution(self.params['W4'], self.params['b4'],
                                       conv_param_4['stride'], conv_param_4['pad']))
        self.layers.append(BatchNormalization(self.params['gamma1'], self.params['beta1']))
        self.layers.append(ReLU())
        self.layers.append(MaxPooling(pool_h=2, pool_w=2, stride=2))

        self.layers.append(Convolution(self.params['W5'], self.params['b5'],
                                       conv_param_5['stride'], conv_param_5['pad']))
        self.layers.append(BatchNormalization(self.params['gamma1'], self.params['beta1']))
        self.layers.append(ReLU())
        self.layers.append(Convolution(self.params['W6'], self.params['b6'],
                                       conv_param_6['stride'], conv_param_6['pad']))
        self.layers.append(BatchNormalization(self.params['gamma1'], self.params['beta1']))
        self.layers.append(ReLU())
        self.layers.append(MaxPooling(pool_h=2, pool_w=2, stride=2))

        self.layers.append(Affine(self.params['W7'], self.params['b7']))
        self.layers.append(BatchNormalization(self.params['gamma1'], self.params['beta1']))
        self.layers.append(ReLU())
        self.layers.append(Dropout(0.15))
        self.layers.append(Affine(self.params['W8'], self.params['b8']))

        self.last_layer = SoftmaxWithLoss()

    def predict(self, x, train_flg=False):
        for layer in self.layers:
            # Dropout / BatchNorm を使っている場合は学習フラグを渡す
            if isinstance(layer, (Dropout, BatchNormalization)):
                x = layer.forward(x, train_flg)
            else:
                x = layer.forward(x)
        return x

    def loss(self, x, t):
        """
        t は one-hot (N, num_classes) を想定。
        SoftmaxWithLoss.forward(y, t) が one-hot を受け取れる実装であること。
        """
        y = self.predict(x, train_flg=True)
        return self.last_layer.forward(y, t)

    def accuracy(self, x, t, batch_size=100):
        """
        one-hot ラベルにも整数ラベルにも対応。
        """
        # 必要に応じて one-hot -> 整数 に変換
        if t.ndim != 1:
            t = np.argmax(t, axis=1)

        acc = 0.0
        N = x.shape[0]
        if batch_size is None or batch_size >= N:
            y = self.predict(x, train_flg=False)
            y = np.argmax(y, axis=1)
            return float(np.sum(y == t)) / N

        iters = (N + batch_size - 1) // batch_size
        for i in range(iters):
            tx = x[i*batch_size : (i+1)*batch_size]
            tt = t[i*batch_size : (i+1)*batch_size]
            y = self.predict(tx, train_flg=False)
            y = np.argmax(y, axis=1)
            acc += np.sum(y == tt)
        return acc / N

    def gradient(self, x, t):
        """
        勾配計算:forward で loss を評価 → last_layer から逆伝播 → 各層の dW/db を集約
        """
        # forward
        self.loss(x, t)

        # backward
        dout = 1
        dout = self.last_layer.backward(dout)

        tmp_layers = self.layers.copy()
        tmp_layers.reverse()
        for layer in tmp_layers:
            dout = layer.backward(dout)

        # 勾配をまとめる(Convolution/Affine の層index)
        grads = {}
        # ※ 下の index は以下の層並びを前提:
        # [0]Conv1, [1] BN, [2]ReLU, [3]Conv2, [4]BN, [5]ReLU, [6]MP,
        # [7]Conv3, [8]BN, [9]ReLU, [10]Conv4, [11]BN, [12]ReLU, [13]MP
        # [14]Conv5, [15]BN, [16]ReLU, [17]Conv6, [18]BN, [19]ReLU,[20]MP
        # [21]Affine(W7), [22]BN, [23]ReLU,[24]Dropout,[25]Affine(W8)
        for i, layer_idx in enumerate((0, 3, 7, 10, 14, 17, 21, 25)):
            grads['W' + str(i+1)] = self.layers[layer_idx].dW
            grads['b' + str(i+1)] = self.layers[layer_idx].db

        # BN layers: add grads
        bn_indices = [1, 4, 8, 11, 15, 18, 22]  # if each conv followed by BN
        for j, idx in enumerate(bn_indices):
            grads['gamma' + str(j+1)] = self.layers[idx].dgamma
            grads['beta'  + str(j+1)] = self.layers[idx].dbeta

        return grads

上記モデルを利用しデータの学習を行う

以下コード内のepochsの回数分だけ学習を行う。初回の学習が一番時間がかかる。データ量やCPU、GPUのスペックによっては数十分~数時間かかることもザラ、というかスペックが足りない場合はPCがハングするので注意。GPU使用推奨。設定される学習率(lr)はAdam(活性化関数)を使う場合は一般的には1e-3がベターとのことだが、試行錯誤を繰り返しチューニングを行うことを推奨。

# データ読み込み
x = train           # shape: (N, 370, 203, 3)
print(x.shape)
t = train_labels    # shape: (N, num_classes) one-hot
print(t.shape)

# NHWC -> NCHW 変換

print("x shape:", x.shape)
print("t shape:", t.shape)

# ハイパーパラメータ
epochs = 30
batch_size = 32
lr = 1e-3
rho = 0.9

# Optimizer選択(例: Adadelta)
#optimizer = SGD(lr=lr)
#optimizer = RMSProp(lr=lr, rho=0.9)
#optimizer = Adam(lr = lr)
optimizer = Adadelta(rho=0.9)

# データ数
xsize = x.shape[0]
iter_num = int(np.ceil(xsize / batch_size))

# ネットワーク構築
dnet = DeepConvNet(input_dim=(3, 64, 64),
                   conv_param_1={'filter_num':18, 'filter_size':3, 'pad':1, 'stride':1},
                   conv_param_2={'filter_num':18, 'filter_size':3, 'pad':1, 'stride':1},
                   conv_param_3={'filter_num':36, 'filter_size':3, 'pad':1, 'stride':1},
                   conv_param_4={'filter_num':36, 'filter_size':3, 'pad':2, 'stride':1},
                   conv_param_5={'filter_num':72, 'filter_size':3, 'pad':1, 'stride':1},
                   conv_param_6={'filter_num':72, 'filter_size':3, 'pad':1, 'stride':1},
                   hidden_size=50,
                   output_size=t.shape[1])  # 出力次元をラベル数に合わせる

# ログ
train_loss, test_loss = [], []
train_accuracy, test_accuracy = [], []
#print("X shape:", x_batch.shape)  # ミニバッチの入力 shape
#print("First Conv W shape:", dnet.params['W1'].shape)  # 最初の畳み込み層の重み shape

for epoch in range(epochs):
    print(f"epoch={epoch}")

    # シャッフル
    idx = np.arange(xsize)
    np.random.shuffle(idx)

    for it in range(iter_num):
        # ミニバッチ取り出し
        mask = idx[batch_size*it : batch_size*(it+1)]
        x_batch = x[mask]
        t_batch = t[mask]

        # 勾配計算
        grads = dnet.gradient(x_batch, t_batch)

        # パラメータ更新
        optimizer.update(dnet.params, grads)


    # ===== 学習経過の記録 =====
    # 訓練データのloss
    train_loss.append(dnet.loss(x, t))

    # テストデータのloss
    test_loss.append(dnet.loss(test, test_labels))

    # 精度
    train_accuracy.append(dnet.accuracy(x, t))
    test_accuracy.append(dnet.accuracy(test, test_labels))

    print(f" -> loss(train): {train_loss[-1]:.4f}, acc(train): {train_accuracy[-1]:.4f}, acc(test): {test_accuracy[-1]:.4f}")

学習結果のグラフ化

# lossとaccuracyのグラフ化
df_log = pd.DataFrame({"train_loss":train_loss,
             "test_loss":test_loss,
             "train_accuracy":train_accuracy,
             "test_accuracy":test_accuracy})

df_log.plot(style=['r-', 'r--', 'b-', 'b--'])
plt.ylim([0,7])
plt.ylabel("Accuracy or loss")
plt.xlabel("epochs")
plt.show()

学習したpickleファイルの出力

こちらのpickleファイルは以下の実装3で使うので出力する場所に注意

with open("/path/to/trajectory_x_model.pickle", "wb") as f:
    pickle.dump(dnet, f)

実装3 (学習した結果を利用し動画データから軌跡データを作成する: Video to Trajectory)

以下はX座標、Y座標、Z座標、z_quat、w_quatのデータをCSVに落とし込むケース。
各category関数の中でpickleファイルを使ったクラス類推を行い、データを返していく。
なお、Speedのデータもこちらで落とし込んでいるが、データ収集の方法及び学習の方法は別途記事で記載予定。

import os
import cv2
import numpy as np
import pickle
import tensorflow as tf
from datetime import datetime
import csv
import json


x_json_path = "/path/to/1_data/new/class_index_x.json"
# npyファイル変換時に保存したJSONファイルを読み込む
with open(x_json_path, "r", encoding="utf-8") as f:
    data = json.load(f)

# キーを int 化してリストにまとめる
x_int_keys = [int(k) for k in data.keys()]
#print(x_int_keys)

y_json_path = "/path/to/1_data/new/class_index_y.json"
# JSONファイルを読み込む
with open(y_json_path, "r", encoding="utf-8") as f:
    data = json.load(f)

# キーを int 化してリストにまとめる
y_int_keys = [int(k) for k in data.keys()]

z_json_path = "/path/to/1_data/new/class_index_z.json"
# JSONファイルを読み込む
with open(z_json_path, "r", encoding="utf-8") as f:
    data = json.load(f)

# キーを int 化してリストにまとめる
z_int_keys = [float(k) for k in data.keys()]
#print(z_int_keys)

zq_json_path = "/path/to/1_data/new/class_index_z_quat.json"
# JSONファイルを読み込む
with open(zq_json_path, "r", encoding="utf-8") as f:
    data = json.load(f)

# キーを int 化してリストにまとめる
zq_int_keys = [float(k) for k in data.keys()]
#print(zq_int_keys)

wq_json_path = "/path/to/1_data/new/class_index_w_quat.json"
# JSONファイルを読み込む
with open(wq_json_path, "r", encoding="utf-8") as f:
    data = json.load(f)

# キーを int 化してリストにまとめる
wq_int_keys = [float(k) for k in data.keys()]
print(wq_int_keys)


def wq_categorizer(img: np.ndarray):
    with open("/path/to/trajectory_z_model.pickle", "rb") as f:
        model = pickle.load(f)

    categories: List[int] = wq_int_keys  # 例: [89613, 89614, ...] のようなintリスト

    if img is None:
        raise ValueError("img is None. Pass a valid cropped image array.")

    # --- RGB (BGR→RGB) ---
    if img.ndim == 3:
        rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    else:
        raise ValueError(f"Unsupported img shape: {img.shape}")

    # --- resize ---
    rgb = cv2.resize(rgb, (64, 64), interpolation=cv2.INTER_AREA)

    # --- shape: NCHW (1,3,64,64) ---
    x = rgb.astype(np.float32)
    x = np.expand_dims(x, axis=0)           # (1,64,64,3)
    x = np.transpose(x, (0, 3, 1, 2))       # (1,3,64,64)

    # --- inference ---
    features = model.predict(x)

    # --- 分類ロジック:最もスコアが高いクラスを採用(argmax) ---
    if features.ndim == 2 and features.shape[1] == len(categories):
        idx = int(np.argmax(features[0]))   # しきい値は使わず常にargmax
        cat = categories[idx]
    else:
        # 回帰的出力など:最も近いカテゴリへ丸め
        val = float(np.ravel(features)[0])
        idx = int(np.argmin([abs(val - c) for c in categories]))
        cat = categories[idx]

    mess = cat
    return mess


def zq_categorizer(img: np.ndarray):
    with open("/path/to/trajectory_z_model.pickle", "rb") as f:
        model = pickle.load(f)

    categories: List[int] = zq_int_keys  # 例: [89613, 89614, ...] のようなintリスト

    if img is None:
        raise ValueError("img is None. Pass a valid cropped image array.")

    # --- RGB (BGR→RGB) ---
    if img.ndim == 3:
        rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    else:
        raise ValueError(f"Unsupported img shape: {img.shape}")

    # --- resize ---
    rgb = cv2.resize(rgb, (64, 64), interpolation=cv2.INTER_AREA)

    # --- shape: NCHW (1,3,64,64) ---
    x = rgb.astype(np.float32)
    x = np.expand_dims(x, axis=0)           # (1,64,64,3)
    x = np.transpose(x, (0, 3, 1, 2))       # (1,3,64,64)

    # --- inference ---
    features = model.predict(x)

    # --- 分類ロジック:最もスコアが高いクラスを採用(argmax) ---
    if features.ndim == 2 and features.shape[1] == len(categories):
        idx = int(np.argmax(features[0]))   # しきい値は使わず常にargmax
        cat = categories[idx]
    else:
        # 回帰的出力など:最も近いカテゴリへ丸め
        val = float(np.ravel(features)[0])
        idx = int(np.argmin([abs(val - c) for c in categories]))
        cat = categories[idx]

    mess = cat
    return mess


def z_categorizer(img: np.ndarray):
    with open("/path/to/trajectory_z_model.pickle", "rb") as f:
        model = pickle.load(f)

    categories: List[int] = z_int_keys  # 例: [89613, 89614, ...] のようなintリスト

    if img is None:
        raise ValueError("img is None. Pass a valid cropped image array.")

    # --- RGB (BGR→RGB) ---
    if img.ndim == 3:
        rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    else:
        raise ValueError(f"Unsupported img shape: {img.shape}")

    # --- resize ---
    rgb = cv2.resize(rgb, (64, 64), interpolation=cv2.INTER_AREA)

    # --- shape: NCHW (1,3,64,64) ---
    x = rgb.astype(np.float32)
    x = np.expand_dims(x, axis=0)           # (1,64,64,3)
    x = np.transpose(x, (0, 3, 1, 2))       # (1,3,64,64)

    # --- inference ---
    features = model.predict(x)

    # --- 分類ロジック:最もスコアが高いクラスを採用(argmax) ---
    if features.ndim == 2 and features.shape[1] == len(categories):
        idx = int(np.argmax(features[0]))   # しきい値は使わず常にargmax
        cat = categories[idx]
    else:
        # 回帰的出力など:最も近いカテゴリへ丸め
        val = float(np.ravel(features)[0])
        idx = int(np.argmin([abs(val - c) for c in categories]))
        cat = categories[idx]

    mess = cat
    return mess


def y_categorizer(img: np.ndarray):
    with open("/path/to/trajectory_y_model.pickle", "rb") as f:
        model = pickle.load(f)

    categories: List[int] = y_int_keys  # 例: [89613, 89614, ...] のようなintリスト

    if img is None:
        raise ValueError("img is None. Pass a valid cropped image array.")

    # --- RGB (BGR→RGB) ---
    if img.ndim == 3:
        rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    else:
        raise ValueError(f"Unsupported img shape: {img.shape}")

    # --- resize ---
    rgb = cv2.resize(rgb, (64, 64), interpolation=cv2.INTER_AREA)

    # --- shape: NCHW (1,3,64,64) ---
    x = rgb.astype(np.float32)
    x = np.expand_dims(x, axis=0)           # (1,64,64,3)
    x = np.transpose(x, (0, 3, 1, 2))       # (1,3,64,64)

    # --- inference ---
    features = model.predict(x)

    # --- 分類ロジック:最もスコアが高いクラスを採用(argmax) ---
    if features.ndim == 2 and features.shape[1] == len(categories):
        idx = int(np.argmax(features[0]))   # しきい値は使わず常にargmax
        cat = categories[idx]
    else:
        # 回帰的出力など:最も近いカテゴリへ丸め
        val = float(np.ravel(features)[0])
        idx = int(np.argmin([abs(val - c) for c in categories]))
        cat = categories[idx]

    mess = cat
    return mess


def x_categorizer(img: np.ndarray):
    with open("/path/to/trajectory_x_model.pickle", "rb") as f:
        model = pickle.load(f)

    categories: List[int] = x_int_keys  # 例: [89613, 89614, ...] のようなintリスト

    if img is None:
        raise ValueError("img is None. Pass a valid cropped image array.")

    # --- RGB (BGR→RGB) ---
    if img.ndim == 3:
        rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    else:
        raise ValueError(f"Unsupported img shape: {img.shape}")

    # --- resize ---
    rgb = cv2.resize(rgb, (64, 64), interpolation=cv2.INTER_AREA)

    # --- shape: NCHW (1,3,64,64) ---
    x = rgb.astype(np.float32)
    x = np.expand_dims(x, axis=0)           # (1,64,64,3)
    x = np.transpose(x, (0, 3, 1, 2))       # (1,3,64,64)

    # --- inference ---
    features = model.predict(x)

    # --- 分類ロジック:最もスコアが高いクラスを採用(argmax) ---
    if features.ndim == 2 and features.shape[1] == len(categories):
        idx = int(np.argmax(features[0]))   # しきい値は使わず常にargmax
        cat = categories[idx]
    else:
        # 回帰的出力など:最も近いカテゴリへ丸め
        val = float(np.ravel(features)[0])
        idx = int(np.argmin([abs(val - c) for c in categories]))
        cat = categories[idx]

    mess = cat
    return mess

def n_categorizer(img: np.ndarray):
    with open("/path/to/trajectory_n_model.pickle", "rb") as f:
        model = pickle.load(f)

    categories = [30, 31, 32, 33, 34, 35, 36, 37]

    if img is None:
        raise ValueError("img is None. Pass a valid cropped image array.")

    if img.ndim == 3:
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    elif img.ndim == 2:
        gray = img
    else:
        raise ValueError(f"Unsupported img shape: {img.shape}")

    gray = cv2.resize(gray, (64, 64), interpolation=cv2.INTER_AREA)

    x = gray.astype(np.float32)
    x = np.expand_dims(x, axis=(0, -1))     # (1,64,64,1)
    x = np.transpose(x, (0, 3, 1, 2))       # (1,1,64,64)  NCHW

    features = model.predict(x)

    # --- 分類ロジック:最もスコアが高いクラスを採用(argmax) ---
    if features.ndim == 2 and features.shape[1] == len(categories):
        idx = int(np.argmax(features[0]))   # しきい値は使わず常にargmax
        cat = categories[idx]
    else:
        # 回帰的出力など:最も近いカテゴリへ丸め
        val = float(np.ravel(features)[0])
        idx = int(np.argmin([abs(val - c) for c in categories]))
        cat = categories[idx]

    mess = cat // 3.6
    return mess

def main():
    # ==== 設定 ====
    VIDEO_PATH = str(input("Enter video path: "))  # 入力動画パス
    STEP_SEC = 1                                   # 1秒ごと
    START_SEC = 0                                 # 開始:60s(含む)
    END_SEC   = 40                                # 終了:100s(含む)

    # ROI(右上)
    ROI_WIDTH_PCT    = 0.15
    ROI_HEIGHT_PCT   = 0.20
    MARGIN_RIGHT_PCT = 0.01
    MARGIN_TOP_PCT   = 0.02

    # 出力先(権限問題を避けたい場合は ~/ を推奨)
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    OUT_DIR = os.path.expanduser(f"/path/to/crops_top_right_{ts}")
    os.makedirs(OUT_DIR, exist_ok=True)
    CSV_PATH = os.path.join(OUT_DIR, "result.csv")

    # ==== 動画情報 ====
    cap = cv2.VideoCapture(VIDEO_PATH)
    if not cap.isOpened():
        raise FileNotFoundError(f"Cannot open: {VIDEO_PATH}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
    if fps <= 0:
        fps = 30.0
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if cap.get(cv2.CAP_PROP_FRAME_COUNT) > 0 else -1
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    duration_sec = (frame_count / fps) if frame_count > 0 else 0

    # 60〜100秒の範囲を動画長にクランプ
    start_s = max(0, START_SEC)
    end_s   = min(int(duration_sec), END_SEC)
    if end_s < start_s:
        raise ValueError(f"Video too short for requested range: {duration_sec:.2f}s")

    # ==== ROI ====
    x2 = int(w * (1.0 - MARGIN_RIGHT_PCT))
    x1 = max(0, x2 - int(w * ROI_WIDTH_PCT))
    y1 = int(h * MARGIN_TOP_PCT)
    y2 = min(h, y1 + int(h * ROI_HEIGHT_PCT))
    x1 = max(0, min(x1, w-1)); x2 = max(1, min(x2, w))
    y1 = max(0, min(y1, h-1)); y2 = max(1, min(y2, h))

    # デバッグ:最初のフレームにROIを描画
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
    ok, first = cap.read()
    if ok and first is not None:
        dbg = first.copy()
        cv2.rectangle(dbg, (x1, y1), (x2, y2), (0,255,0), 2)
        cv2.imwrite(os.path.join(OUT_DIR, "debug_roi_on_first_frame.png"), dbg)

    # ==== 60〜100秒を1秒ごとに処理し、各行を出力 ====
    with open(CSV_PATH, "w", newline="", encoding="utf-8") as fcsv:
        writer = csv.writer(fcsv)

        # 必要ならヘッダーも書く(任意)
        header = [
            "x", "y", "z",
            "x_quat", "y_quat", "z_quat", "w_quat",
            "speed"
        ]
        # ここに row の列名を足したい場合は header + [...] として書けます
        writer.writerow(header)

        for s in range(start_s, end_s + 1, STEP_SEC):  # 両端含む
            # 指定秒のフレームへ
            fidx = None
            if frame_count > 0:
                fidx = min(int(round(s * fps)), max(0, frame_count - 1))
                cap.set(cv2.CAP_PROP_POS_FRAMES, fidx)
            else:
                cap.set(cv2.CAP_PROP_POS_MSEC, s * 1000.0)

            ok, frame = cap.read()
            if not ok or frame is None:
                # 取得できなければ空行(8列空)を入れて継続
                writer.writerow([""] * 8)
                print(f"[{s:>3}s] frame read failed -> empty row")
                continue

            crop = frame[y1:y2, x1:x2]

            try:
                mess_value_n = n_categorizer(crop)
                mess_value_x = x_categorizer(frame)
                mess_value_y = y_categorizer(frame)
                mess_value_z = z_categorizer(frame)
                mess_value_zq = zq_categorizer(frame)
                mess_value_wq = wq_categorizer(frame)
            except Exception as e:
                mess_value_n = ""
                mess_value_x = ""
                mess_value_y = ""
                mess_value_z = ""
                mess_value_zq = ""
                mess_value_wq = ""
                print(f"[{s:>3}s] inference error: {e}")

            # 8列目だけ値、1〜7列は空
            row = [mess_value_x] + [mess_value_y] + [mess_value_z] + [0] * 2 + [mess_value_zq] + [mess_value_wq] + [mess_value_n]
                    # 1列目に "x,y,z,x_quat,y_quat,z_quat,w_quat,speed" を固定で入れ、
            # 2列目以降に row を入れる
            csv_row = row

            writer.writerow(csv_row)
            print(f"[{s:>3}s] fidx={fidx if fidx is not None else '-'} -> mess_value_n={mess_value_n}, mexx_value_x={mess_value_x}")

    cap.release()
    print(f"\nCSV saved to: {CSV_PATH}")

if __name__ == "__main__":
    main()

実装4(簡単に軌跡データを作成できるUIを作成)

以下は上記コードをバックエンドとして使い、flaskを使ってフロントエンド(html)を呼び出して簡単に軌跡データを作成できるようにしたコード。
なおこちらを使用する際のディレクトリの構成は以下

root
 |-v2t.py(バックエンドコード)
 |-x_category.py(X座標を類推するコード)
 |-y_category.py(Y座標を類推するコード)
 |-z_category.py(Z座標を類推するコード)
 |-zq_category.py(z_quatを類推するコード)
 |-wq_category.py(w_quatを類推するコード)
 |-templates(フロントエンドコード)
   |-index.html
   |-progress.html
   └-complete.html
 |-statistic
  └-picture.png(処理完了後に表示される画像ファイル)
  

バックエンド(flaskを使ったpythonコード)

from flask import Flask, render_template, request, redirect, url_for, send_file, jsonify, abort
import os
import csv
import threading
import time
import tempfile
import cv2
import numpy as np
import pickle
import tensorflow as tf
from common.layers import DeepConvNet
from datetime import datetime
import json

# --- test.py から推論関数をインポート ---
#   n_categorizer, x_categorizer, y_categorizer, z_categorizer, zq_categorizer, wq_categorizer を使用
from test import (
    n_categorizer, x_categorizer, y_categorizer,
    z_categorizer, zq_categorizer, wq_categorizer
)

app = Flask(__name__)

# 設定
UPLOAD_DIR = os.path.abspath("uploads")
os.makedirs(UPLOAD_DIR, exist_ok=True)

progress = {
    "percent": 0,
    "status": "idle",
    "error": ""
}
RESULT_CSV_PATH = None  # 完了時にセット
GARY_PATH = "/path/to/gary.png"  # 添付画像のフルパス(staticへコピー不要)

# 推論パラメータ(test.py の main にあった既定値を移植)
STEP_SEC = 1
START_SEC = 0
END_SEC = 40
# ROI(右上)
ROI_WIDTH_PCT    = 0.10
ROI_HEIGHT_PCT   = 0.20
MARGIN_RIGHT_PCT = 0.20
MARGIN_TOP_PCT   = 0.02


def process_video_to_csv(video_path: str, out_dir: str):
    """
    test.py の main() がやっていた処理を Flask 用に関数化。
    - 0〜40秒を1秒ごとにフレーム切り出し
    - 右上ROIを crop
    - 6つの categorizer で推論
    - CSV 1行目は固定ヘッダ、2行目以降は実値+row(test.py同様の列構成)
    """
    global progress, RESULT_CSV_PATH
    progress.update({"percent": 0, "status": "running", "error": ""})

    # 出力CSV
    tsdir = out_dir
    os.makedirs(tsdir, exist_ok=True)
    csv_path = os.path.join(tsdir, "result.csv")

    # 動画
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        progress.update({"status": "error", "error": f"Cannot open: {video_path}"})
        raise FileNotFoundError(f"Cannot open: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if cap.get(cv2.CAP_PROP_FRAME_COUNT) > 0 else -1
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    duration_sec = (frame_count / fps) if frame_count > 0 else 0

    start_s = max(0, START_SEC)
    end_s   = min(int(duration_sec), END_SEC)
    if end_s < start_s:
        progress.update({"status": "error", "error": "Video too short"})
        raise ValueError(f"Video too short: {duration_sec:.2f}s")

    # ROI
    x2 = int(w * (1.0 - MARGIN_RIGHT_PCT))
    x1 = max(0, x2 - int(w * ROI_WIDTH_PCT))
    y1 = int(h * MARGIN_TOP_PCT)
    y2 = min(h, y1 + int(h * ROI_HEIGHT_PCT))
    x1 = max(0, min(x1, w-1)); x2 = max(1, min(x2, w))
    y1 = max(0, min(y1, h-1)); y2 = max(1, min(y2, h))

    # 進捗総数(サンプル数)
    total_steps = max(1, (end_s - start_s + 1))
    done = 0

    with open(csv_path, "w", newline="", encoding="utf-8") as fcsv:
        writer = csv.writer(fcsv)

        # 1行目:固定ヘッダ(文字列)
        header = ["x","y","z","x_quat","y_quat","z_quat","w_quat","speed"]
        writer.writerow(header)

        # 2行目以降:実値 + 推論row
        for s in range(start_s, end_s + 1, STEP_SEC):
            # 対応フレームへ
            if frame_count > 0:
                fidx = min(int(round(s * fps)), max(0, frame_count - 1))
                cap.set(cv2.CAP_PROP_POS_FRAMES, fidx)
            else:
                cap.set(cv2.CAP_PROP_POS_MSEC, s * 1000.0)

            ok, frame = cap.read()
            if not ok or frame is None:
                # フレーム取得失敗時は空行
                writer.writerow([""] * 8)
                done += 1
                progress["percent"] = int(done / total_steps * 100)
                continue

            crop = frame[y1:y2, x1:x2]

            # test.py と同様の出力列構成:
            # row = [mess_value_x, mess_value_y, mess_value_z, 0, 0, mess_value_zq, mess_value_wq, mess_value_n]
            # try:
            mv_n  = n_categorizer(crop)
            mv_x  = x_categorizer(frame)
            mv_y  = y_categorizer(frame)
            mv_z  = z_categorizer(frame)
            mv_zq = zq_categorizer(frame)
            mv_wq = wq_categorizer(frame)
            # except Exception as e:
            #     mv_n = mv_x = mv_y = mv_z = mv_zq = mv_wq = ""
            #     # 続行(CSVには空欄等で記録)
            
            row = [mv_x, mv_y, mv_z, 0, 0, mv_zq, mv_wq, mv_n]

            writer.writerow(row)

            done += 1
            progress["percent"] = int(done / total_steps * 100)

    cap.release()
    RESULT_CSV_PATH = csv_path
    progress.update({"status": "done"})


@app.route("/")
def index():
    return render_template("index.html")


@app.route("/upload", methods=["POST"])
def upload():
    if "file" not in request.files:
        abort(400, "no file")
    file = request.files["file"]
    if not file.filename.lower().endswith(".mp4"):
        abort(400, "mp4 only")

    save_path = os.path.join(UPLOAD_DIR, file.filename)
    file.save(save_path)

    # 一時ディレクトリへ出力
    out_dir = tempfile.mkdtemp(prefix="v2t_")

    # バックグラウンド実行
    t = threading.Thread(target=process_video_to_csv, args=(save_path, out_dir), daemon=True)
    t.start()

    return redirect(url_for("progress_page"))


@app.route("/progress")
def progress_page():
    return render_template("progress.html")


@app.route("/progress_status")
def progress_status():
    return jsonify(progress)


@app.route("/download")
def download():
    global RESULT_CSV_PATH
    if progress.get("status") != "done" or not RESULT_CSV_PATH or not os.path.exists(RESULT_CSV_PATH):
        abort(404)
    return send_file(RESULT_CSV_PATH, as_attachment=True, download_name="result.csv")


@app.route("/complete")
def complete():
    # /mnt/data/gary.png をそのまま返すためのURLを渡す
    return render_template("complete.html", image_url=url_for("gary_image"))


@app.route("/gary.png")
def gary_image():
    if not os.path.exists(GARY_PATH):
        abort(404)
    return send_file(GARY_PATH, mimetype="image/png")


if __name__ == "__main__":
    # 本番は debug=False を推奨
    app.run(host="0.0.0.0", port=5000, debug=True)

使い方

terminalから上記コードのディレクトリに移動し、python3 v2t.pyコマンドを入力するとローカルアドレスが発行され、そちらに遷移することで以下のようなUIが出現、使用可能となる。

その他

アルゴリズム選定の工夫

層の深いCNNを選択したのはシミュレーターの出力する複雑な特徴量を持つ画像データも扱えると考えたため。最初は単に畳み込み層により構成されるモデルを使用して学習を進めていたが、学習が収束しなかったためReLuとBatch Normalizationの組み合わせを層に挟み込むなどの工夫を行うことで少ない学習データでもひとまずの学習の収束を図ることができた。

データセットの取得方法

OptunaとAWSIMを併用することで自動で動画データが溜まることを利用し、別途記事記載の走行時に座標データをCSVファイルとして保存できるコードを活用しデータを取得していった。その後こちらも別途記載予定の自動でデータを仕分けしてくれるコードを活用することで学習データ、ラベルデータを取得する前処理を行った。

学習での失敗談、成功談

使っているデスクトップPCのGPUが旧式だったので学習中にPCがハングするのでGoogle Colabを使用したが、有料プランで最高性能を持つv-6e TPUを使用しても依然ハングしてしまう状況だったので学習データを絞らざるを得ない状況になった。結果として一つの座標に対して7000~8000程度しかデータが処理できなかったため学習が収束しないという状況に直面したが、Bath NormalizationとReLuの組み合わせ及びMax Poolingを処理に挟むことで少ない学習データでも学習を収束させることに成功した。ただ結果を見る限りどうも局所最適化となっている傾向で、改めてGPUをアップデートした後に再学習を行う必要がありそうだった。

推論時の工夫

一旦argmax方式で最尤の推論結果を出す方式を取ったが、上記の局所最適化になっている傾向を鑑み引き続き要改善。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?