はじめに
以前「機械学習で動画データから座標を取ってみよう」というテーマで記事を書いたが、CSVファイルにspeedの項目をどのように学習し、データを返すかを記述していなかったのでこちらで補足
以前の記事
前提
こちらの記事は以下の3つのパートで構成される。
1どうやって学習を行うための学習データ、ラベルデータを取得するか
2学習データ、ラベルデータからどのようにAIに学習させるか
3学習させたデータを使ってどのように速度データを返していくか
1どうやって学習を行うための学習データ、ラベルデータを取得するか
まずはAIに学習させるために画像データをnpyファイルに変換する必要があるのでそちらの前準備。
手順としては予めスクショで以下のように画像データを保存しておく
...
今回は30km/h~37km/hを想定しているので一旦こちらの7枚
その後30~37の7つのフォルダを作成しそれぞれに対応する画像を保存する。
これでnpyファイルに変換する前準備はOK。
フォルダ構成は以下のようにしておく
root
|-py or jpynbファイル(npyに変換するコード)
|- 1_data
|- new (npyファイルを格納するフォルダ)
|-30(対応する画像を入れるフォルダ)
|-31
|-32
...
└-37
こちら準備が出来たら以下のコードを実行(詳細は前回記事である以下を参照)
# -*- coding: utf-8 -*-
import os
from pathlib import Path
import json
import numpy as np
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical # np_utils 互換
from tensorflow.keras.utils import load_img, img_to_array, array_to_img
# ========= 設定 =========
ROOT_DIR = Path(r"/path/to/root") # ←当コードが格納されているフォルダ
OUT_DIR = Path(r"/path/to/1_data/new") # npy等の出力先
OUT_DIR.mkdir(parents=True, exist_ok=True)
IMG_SIZE = (64, 64)
COLOR_MODE = "grayscale" # "rgb" か "grayscale"。今回読み込むのは数字なのでgrayscaleでOK
DTYPE = "float32" # 出力配列のdtype
FILE_PATTERNS = ("*.jpeg", "*.jpg", "*.png") # 読み込む拡張子
# 水増し設定(0なら水増しなし。今回は各フォルダ1000水増ししたいので1000を入力)
AUG_PER_IMAGE = 100
datagen = ImageDataGenerator(
rotation_range=0,
width_shift_range=0.0,
height_shift_range=0.0,
zoom_range=0.1,
# horizontal_flip=True,
# vertical_flip=True,
)
# ========= ここから処理 =========
def iter_image_paths(folder: Path):
for pat in FILE_PATTERNS:
for p in folder.glob(pat):
if p.is_file():
yield p
def load_image_as_array(path: Path):
# Keras load_img: color_mode="grayscale" | "rgb"
img = load_img(str(path), color_mode=COLOR_MODE, target_size=IMG_SIZE)
arr = img_to_array(img) # HxWx1 or HxWx3
return arr
def main():
X_list = []
y_list = []
class_index = {} # {"folder_name": class_id}
# サブフォルダをクラスとして走査
subfolders = sorted([p for p in ROOT_DIR.iterdir() if p.is_dir()])
#print(subfolders)
#subfolders = dic_x
if not subfolders:
raise RuntimeError(f"No subfolders found under: {ROOT_DIR}")
for cid, cls_dir in enumerate(subfolders):
class_index[cls_dir.name] = cid
print("cid: " + str(cid))
imgs = list(iter_image_paths(cls_dir))
if not imgs:
print(f"[WARN] no images in: {cls_dir}")
continue
print(f"[{cls_dir.name}] files={len(imgs)} -> class_id={cid}")
for img_path in imgs:
try:
arr = load_image_as_array(img_path) # (H, W, C)
except Exception as e:
print(f" [SKIP] {img_path} -> {e}")
continue
X_list.append(arr)
y_list.append(cid)
# ---- 水増し(メモリ内)----
if AUG_PER_IMAGE > 0:
x = np.expand_dims(arr, axis=0) # (1, H, W, C)
g = datagen.flow(x, batch_size=1, shuffle=False)
for _ in range(AUG_PER_IMAGE):
aug = next(g)[0] # (H, W, C)
X_list.append(aug)
y_list.append(cid)
if not X_list:
raise RuntimeError("No images collected. Check ROOT_DIR / patterns / structure.")
X = np.stack(X_list).astype(DTYPE) # (N, H, W, C)
y = np.array(y_list, dtype=np.int64)
y = to_categorical(y, num_classes=len(class_index))
# 正規化など必要ならここで
# X /= 255.0
# 保存
np.save(OUT_DIR / "train_data_n.npy", X) #速度を表す数字の学習データを作成
np.save(OUT_DIR / "train_label_n.npy", y) #速度を表す数字のラベルデータを作成
with open(OUT_DIR / "class_index_n.json", "w", encoding="utf-8") as f:
json.dump(class_index, f, ensure_ascii=False, indent=2)
print(f"Saved: X.npy shape={X.shape}, dtype={X.dtype}")
print(f" y.npy shape={y.shape}, num_classes={len(class_index)}")
print(f" class_index -> {OUT_DIR / 'class_index.json'}")
if __name__ == "__main__":
main()
以上でnpyファイルの準備は完了。
2学習データ、ラベルデータからどのようにAIに学習させるか
前回記事からtrain_data, label_dataの箇所だけを以下のように変更
train_data = np.load("/path/to/1_data/new/train_data_n.npy", allow_pickle=True)
train_data = np.transpose(train_data, (0, 3, 1, 2))
train_label = np.load("/path/to/1_data/new/train_label_n.npy", allow_pickle=True)
print(train_data.shape)
print(train_label.shape)
それ以外は前回記事と同様
なお、フォルダは2で作成したrootディレクトリ直下に入れる。
3学習させたデータを使ってどのように速度データを返していくか
2で得たpickleデータを使って以下のように速度データを類推するコードを作成
def n_categorizer(img: np.ndarray):
with open("/path/to/trajectory_n_model.pickle", "rb") as f:
model = pickle.load(f)
categories = [30, 31, 32, 33, 34, 35, 36, 37]
if img is None:
raise ValueError("img is None. Pass a valid cropped image array.")
if img.ndim == 3:
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
elif img.ndim == 2:
gray = img
else:
raise ValueError(f"Unsupported img shape: {img.shape}")
gray = cv2.resize(gray, (64, 64), interpolation=cv2.INTER_AREA)
x = gray.astype(np.float32)
x = np.expand_dims(x, axis=(0, -1)) # (1,64,64,1)
x = np.transpose(x, (0, 3, 1, 2)) # (1,1,64,64) NCHW
features = model.predict(x)
# --- 分類ロジック:最もスコアが高いクラスを採用(argmax) ---
if features.ndim == 2 and features.shape[1] == len(categories):
idx = int(np.argmax(features[0])) # しきい値は使わず常にargmax
cat = categories[idx]
else:
# 回帰的出力など:最も近いカテゴリへ丸め
val = float(np.ravel(features)[0])
idx = int(np.argmin([abs(val - c) for c in categories]))
cat = categories[idx]
mess = cat // 3.6 #軌跡データを作成することを想定し時速を分速に変換
return mess
上記の速度類推の関数を以下のように活用する
def main():
# ==== 設定 ====
VIDEO_PATH = str(input("Enter video path: ")) # 入力動画パス
STEP_SEC = 1 # 1秒ごと
START_SEC = 0 # 開始:60s(含む)
END_SEC = 40 # 終了:100s(含む)
# ROI(右上)
ROI_WIDTH_PCT = 0.15
ROI_HEIGHT_PCT = 0.20
MARGIN_RIGHT_PCT = 0.01
MARGIN_TOP_PCT = 0.02
# 出力先(権限問題を避けたい場合は ~/ を推奨)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
OUT_DIR = os.path.expanduser(f"/home/kota/v2t/crops_top_right_{ts}")
os.makedirs(OUT_DIR, exist_ok=True)
CSV_PATH = os.path.join(OUT_DIR, "result.csv")
# ==== 動画情報 ====
cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
raise FileNotFoundError(f"Cannot open: {VIDEO_PATH}")
fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
if fps <= 0:
fps = 30.0
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if cap.get(cv2.CAP_PROP_FRAME_COUNT) > 0 else -1
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
duration_sec = (frame_count / fps) if frame_count > 0 else 0
# 60〜100秒の範囲を動画長にクランプ
start_s = max(0, START_SEC)
end_s = min(int(duration_sec), END_SEC)
if end_s < start_s:
raise ValueError(f"Video too short for requested range: {duration_sec:.2f}s")
# ==== ROI ====
x2 = int(w * (1.0 - MARGIN_RIGHT_PCT))
x1 = max(0, x2 - int(w * ROI_WIDTH_PCT))
y1 = int(h * MARGIN_TOP_PCT)
y2 = min(h, y1 + int(h * ROI_HEIGHT_PCT))
x1 = max(0, min(x1, w-1)); x2 = max(1, min(x2, w))
y1 = max(0, min(y1, h-1)); y2 = max(1, min(y2, h))
# デバッグ:最初のフレームにROIを描画
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
ok, first = cap.read()
if ok and first is not None:
dbg = first.copy()
cv2.rectangle(dbg, (x1, y1), (x2, y2), (0,255,0), 2)
cv2.imwrite(os.path.join(OUT_DIR, "debug_roi_on_first_frame.png"), dbg)
# ==== 60〜100秒を1秒ごとに処理し、各行を出力 ====
with open(CSV_PATH, "w", newline="", encoding="utf-8") as fcsv:
writer = csv.writer(fcsv)
# 必要ならヘッダーも書く(任意)
header = [
"x", "y", "z",
"x_quat", "y_quat", "z_quat", "w_quat",
"speed"
]
# ここに row の列名を足したい場合は header + [...] として書けます
writer.writerow(header)
for s in range(start_s, end_s + 1, STEP_SEC): # 両端含む
# 指定秒のフレームへ
fidx = None
if frame_count > 0:
fidx = min(int(round(s * fps)), max(0, frame_count - 1))
cap.set(cv2.CAP_PROP_POS_FRAMES, fidx)
else:
cap.set(cv2.CAP_PROP_POS_MSEC, s * 1000.0)
ok, frame = cap.read()
if not ok or frame is None:
# 取得できなければ空行(8列空)を入れて継続
writer.writerow([""] * 8)
print(f"[{s:>3}s] frame read failed -> empty row")
continue
crop = frame[y1:y2, x1:x2]
try:
mess_value_n = n_categorizer(crop)
except Exception as e:
mess_value_n = ""
print(f"[{s:>3}s] inference error: {e}")
# 8列目だけ値、1〜7列は空
row = [""] * 7 + [mess_value_n]
# 1列目に "x,y,z,x_quat,y_quat,z_quat,w_quat,speed" を固定で入れ、
# 2列目以降に row を入れる
csv_row = row
writer.writerow(csv_row)
print(f"[{s:>3}s] fidx={fidx if fidx is not None else '-'} -> mess_value_n={mess_value_n})
cap.release()
print(f"\nCSV saved to: {CSV_PATH}")
if __name__ == "__main__":
main()
上記のコードでは以下のように動画データの中の速度を表す箇所だけROIで切り取り、そこから速度を読み取り、CSVを返している
おわりに
前回記事内で触れていなかった速度の類推に関してこちらの記事で触れた。