0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

以前「機械学習で動画データから座標を取ってみよう」というテーマで記事を書いたが、CSVファイルにspeedの項目をどのように学習し、データを返すかを記述していなかったのでこちらで補足

以前の記事

前提

こちらの記事は以下の3つのパートで構成される。
1どうやって学習を行うための学習データ、ラベルデータを取得するか
2学習データ、ラベルデータからどのようにAIに学習させるか
3学習させたデータを使ってどのように速度データを返していくか

1どうやって学習を行うための学習データ、ラベルデータを取得するか

まずはAIに学習させるために画像データをnpyファイルに変換する必要があるのでそちらの前準備。
手順としては予めスクショで以下のように画像データを保存しておく
スクリーンショット 2025-08-27 182244.png

スクリーンショット 2025-08-27 182253.png

...

スクリーンショット 2025-08-27 182051.png

今回は30km/h~37km/hを想定しているので一旦こちらの7枚
その後30~37の7つのフォルダを作成しそれぞれに対応する画像を保存する。
これでnpyファイルに変換する前準備はOK。
フォルダ構成は以下のようにしておく

root
 |-py or jpynbファイル(npyに変換するコード)
 |- 1_data
  |- new (npyファイルを格納するフォルダ)
 |-30(対応する画像を入れるフォルダ)
 |-31
 |-32
 ...
 └-37

こちら準備が出来たら以下のコードを実行(詳細は前回記事である以下を参照)

# -*- coding: utf-8 -*-
import os
from pathlib import Path
import json
import numpy as np
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical  # np_utils 互換
from tensorflow.keras.utils import load_img, img_to_array, array_to_img

# ========= 設定 =========
ROOT_DIR = Path(r"/path/to/root")   # ←当コードが格納されているフォルダ
OUT_DIR  = Path(r"/path/to/1_data/new")  # npy等の出力先
OUT_DIR.mkdir(parents=True, exist_ok=True)

IMG_SIZE = (64, 64)
COLOR_MODE = "grayscale"      # "rgb" か "grayscale"。今回読み込むのは数字なのでgrayscaleでOK
DTYPE = "float32"             # 出力配列のdtype
FILE_PATTERNS = ("*.jpeg", "*.jpg", "*.png")  # 読み込む拡張子

# 水増し設定(0なら水増しなし。今回は各フォルダ1000水増ししたいので1000を入力)
AUG_PER_IMAGE = 100
datagen = ImageDataGenerator(
    rotation_range=0,
    width_shift_range=0.0,
    height_shift_range=0.0,
    zoom_range=0.1,
    # horizontal_flip=True,
    # vertical_flip=True,
)

# ========= ここから処理 =========
def iter_image_paths(folder: Path):
    for pat in FILE_PATTERNS:
        for p in folder.glob(pat):
            if p.is_file():
                yield p

def load_image_as_array(path: Path):
    # Keras load_img: color_mode="grayscale" | "rgb"
    img = load_img(str(path), color_mode=COLOR_MODE, target_size=IMG_SIZE)
    arr = img_to_array(img)  # HxWx1 or HxWx3
    return arr

def main():
    X_list = []
    y_list = []
    class_index = {}   # {"folder_name": class_id}

    # サブフォルダをクラスとして走査
    subfolders = sorted([p for p in ROOT_DIR.iterdir() if p.is_dir()])
    #print(subfolders)
    #subfolders = dic_x
    if not subfolders:
        raise RuntimeError(f"No subfolders found under: {ROOT_DIR}")

    for cid, cls_dir in enumerate(subfolders):
        class_index[cls_dir.name] = cid
        print("cid: " + str(cid))
        imgs = list(iter_image_paths(cls_dir))
        if not imgs:
            print(f"[WARN] no images in: {cls_dir}")
            continue

        print(f"[{cls_dir.name}] files={len(imgs)}  -> class_id={cid}")

        for img_path in imgs:
            try:
                arr = load_image_as_array(img_path)  # (H, W, C)
            except Exception as e:
                print(f"  [SKIP] {img_path} -> {e}")
                continue

            X_list.append(arr)
            y_list.append(cid)

            # ---- 水増し(メモリ内)----
            if AUG_PER_IMAGE > 0:
                x = np.expand_dims(arr, axis=0)  # (1, H, W, C)
                g = datagen.flow(x, batch_size=1, shuffle=False)
                for _ in range(AUG_PER_IMAGE):
                    aug = next(g)[0]  # (H, W, C)
                    X_list.append(aug)
                    y_list.append(cid)

    if not X_list:
        raise RuntimeError("No images collected. Check ROOT_DIR / patterns / structure.")

    X = np.stack(X_list).astype(DTYPE)   # (N, H, W, C)
    y = np.array(y_list, dtype=np.int64)
    y = to_categorical(y, num_classes=len(class_index))

    # 正規化など必要ならここで
    # X /= 255.0

    # 保存
    np.save(OUT_DIR / "train_data_n.npy", X) #速度を表す数字の学習データを作成
    np.save(OUT_DIR / "train_label_n.npy", y) #速度を表す数字のラベルデータを作成
    with open(OUT_DIR / "class_index_n.json", "w", encoding="utf-8") as f:
        json.dump(class_index, f, ensure_ascii=False, indent=2)

    print(f"Saved: X.npy shape={X.shape}, dtype={X.dtype}")
    print(f"       y.npy shape={y.shape}, num_classes={len(class_index)}")
    print(f"       class_index -> {OUT_DIR / 'class_index.json'}")

if __name__ == "__main__":
    main()

以上でnpyファイルの準備は完了。

2学習データ、ラベルデータからどのようにAIに学習させるか

前回記事からtrain_data, label_dataの箇所だけを以下のように変更

train_data = np.load("/path/to/1_data/new/train_data_n.npy", allow_pickle=True)
train_data = np.transpose(train_data, (0, 3, 1, 2))
train_label = np.load("/path/to/1_data/new/train_label_n.npy", allow_pickle=True)
print(train_data.shape)
print(train_label.shape)

それ以外は前回記事と同様
なお、フォルダは2で作成したrootディレクトリ直下に入れる。

3学習させたデータを使ってどのように速度データを返していくか

2で得たpickleデータを使って以下のように速度データを類推するコードを作成

def n_categorizer(img: np.ndarray):
    with open("/path/to/trajectory_n_model.pickle", "rb") as f:
        model = pickle.load(f)

    categories = [30, 31, 32, 33, 34, 35, 36, 37]

    if img is None:
        raise ValueError("img is None. Pass a valid cropped image array.")

    if img.ndim == 3:
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    elif img.ndim == 2:
        gray = img
    else:
        raise ValueError(f"Unsupported img shape: {img.shape}")

    gray = cv2.resize(gray, (64, 64), interpolation=cv2.INTER_AREA)

    x = gray.astype(np.float32)
    x = np.expand_dims(x, axis=(0, -1))     # (1,64,64,1)
    x = np.transpose(x, (0, 3, 1, 2))       # (1,1,64,64)  NCHW

    features = model.predict(x)

    # --- 分類ロジック:最もスコアが高いクラスを採用(argmax) ---
    if features.ndim == 2 and features.shape[1] == len(categories):
        idx = int(np.argmax(features[0]))   # しきい値は使わず常にargmax
        cat = categories[idx]
    else:
        # 回帰的出力など:最も近いカテゴリへ丸め
        val = float(np.ravel(features)[0])
        idx = int(np.argmin([abs(val - c) for c in categories]))
        cat = categories[idx]

    mess = cat // 3.6 #軌跡データを作成することを想定し時速を分速に変換
    return mess

上記の速度類推の関数を以下のように活用する

def main():
    # ==== 設定 ====
    VIDEO_PATH = str(input("Enter video path: "))  # 入力動画パス
    STEP_SEC = 1                                   # 1秒ごと
    START_SEC = 0                                 # 開始:60s(含む)
    END_SEC   = 40                                # 終了:100s(含む)

    # ROI(右上)
    ROI_WIDTH_PCT    = 0.15
    ROI_HEIGHT_PCT   = 0.20
    MARGIN_RIGHT_PCT = 0.01
    MARGIN_TOP_PCT   = 0.02

    # 出力先(権限問題を避けたい場合は ~/ を推奨)
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    OUT_DIR = os.path.expanduser(f"/home/kota/v2t/crops_top_right_{ts}")
    os.makedirs(OUT_DIR, exist_ok=True)
    CSV_PATH = os.path.join(OUT_DIR, "result.csv")

    # ==== 動画情報 ====
    cap = cv2.VideoCapture(VIDEO_PATH)
    if not cap.isOpened():
        raise FileNotFoundError(f"Cannot open: {VIDEO_PATH}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
    if fps <= 0:
        fps = 30.0
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if cap.get(cv2.CAP_PROP_FRAME_COUNT) > 0 else -1
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    duration_sec = (frame_count / fps) if frame_count > 0 else 0

    # 60〜100秒の範囲を動画長にクランプ
    start_s = max(0, START_SEC)
    end_s   = min(int(duration_sec), END_SEC)
    if end_s < start_s:
        raise ValueError(f"Video too short for requested range: {duration_sec:.2f}s")

    # ==== ROI ====
    x2 = int(w * (1.0 - MARGIN_RIGHT_PCT))
    x1 = max(0, x2 - int(w * ROI_WIDTH_PCT))
    y1 = int(h * MARGIN_TOP_PCT)
    y2 = min(h, y1 + int(h * ROI_HEIGHT_PCT))
    x1 = max(0, min(x1, w-1)); x2 = max(1, min(x2, w))
    y1 = max(0, min(y1, h-1)); y2 = max(1, min(y2, h))

    # デバッグ:最初のフレームにROIを描画
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
    ok, first = cap.read()
    if ok and first is not None:
        dbg = first.copy()
        cv2.rectangle(dbg, (x1, y1), (x2, y2), (0,255,0), 2)
        cv2.imwrite(os.path.join(OUT_DIR, "debug_roi_on_first_frame.png"), dbg)

    # ==== 60〜100秒を1秒ごとに処理し、各行を出力 ====
    with open(CSV_PATH, "w", newline="", encoding="utf-8") as fcsv:
        writer = csv.writer(fcsv)

        # 必要ならヘッダーも書く(任意)
        header = [
            "x", "y", "z",
            "x_quat", "y_quat", "z_quat", "w_quat",
            "speed"
        ]
        # ここに row の列名を足したい場合は header + [...] として書けます
        writer.writerow(header)

        for s in range(start_s, end_s + 1, STEP_SEC):  # 両端含む
            # 指定秒のフレームへ
            fidx = None
            if frame_count > 0:
                fidx = min(int(round(s * fps)), max(0, frame_count - 1))
                cap.set(cv2.CAP_PROP_POS_FRAMES, fidx)
            else:
                cap.set(cv2.CAP_PROP_POS_MSEC, s * 1000.0)

            ok, frame = cap.read()
            if not ok or frame is None:
                # 取得できなければ空行(8列空)を入れて継続
                writer.writerow([""] * 8)
                print(f"[{s:>3}s] frame read failed -> empty row")
                continue

            crop = frame[y1:y2, x1:x2]

            try:
                mess_value_n = n_categorizer(crop)
            except Exception as e:
                mess_value_n = ""

                print(f"[{s:>3}s] inference error: {e}")

            # 8列目だけ値、1〜7列は空
            row = [""] * 7 + [mess_value_n]
                    # 1列目に "x,y,z,x_quat,y_quat,z_quat,w_quat,speed" を固定で入れ、
            # 2列目以降に row を入れる
            csv_row = row

            writer.writerow(csv_row)
            print(f"[{s:>3}s] fidx={fidx if fidx is not None else '-'} -> mess_value_n={mess_value_n})

    cap.release()
    print(f"\nCSV saved to: {CSV_PATH}")

if __name__ == "__main__":
    main()

上記のコードでは以下のように動画データの中の速度を表す箇所だけROIで切り取り、そこから速度を読み取り、CSVを返している

debug_roi_on_first_frame.png

おわりに

前回記事内で触れていなかった速度の類推に関してこちらの記事で触れた。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?