こちらで記載した記事のデータ収集編。
AWSIMを使ったデータの収集方法
まず前段階として以下記事を参考にしてOptunaを実装する(調整するパラメータは完走できるものならなんでもOK)
次に以下記事を参考にしてXY_Loggerと呼ばれる走行カートの位置情報をCSVファイルに落とし込むパッケージを実装する
この2点が実装されたら前準備は完了。
さっそくOptunaを回していくと動画データとCSVデータが勝手にもりもり溜まっていくはず。
データの仕分け
次に溜まったデータを学習データに変換するための前処理を行っていく。
イメージとしては以下のディレクトリに動画データから切り出した画像ファイルを仕分けしていくイメージ。
root
|-x
|-8633...(座標データをもとにフォルダ名をつけている)
|-8634...
|-8635...
...
|-y
|-43117...
|-43118...
...
|-z
|-...
|-z_quat
|-...
|-w_quat
|-...
以下上記の仕分けを行うコード
ROOTは動画ファイル、CSVファイル郡が格納されているフォルダだが、事前にCSVファイルはAWSIMが起動する毎に出力されるoutputフォルダに毎回必ず格納する必要がある点に注意(要するに同じフォルダにないとCSVファイルと動画データの仕分けが機能しない)
import os
import re
import cv2
import math
import pandas as pd
from pathlib import Path
import pandas as pd
import cv2 # only if you’ll open videos
from pathlib import Path
ROOT = Path("/path/to/autoaichallenge_data") # 動画ファイル、CSVファイル郡が格納されているフォルダ
# ====== 出力先 ======
OUT_DIR = r"/path/to/output" #仕分けされた画像ファイルを出力する場所
EXT = ".png" # 保存拡張子(.png 推奨)
# ====== 切り出し間隔(秒)======
INTERVAL_SEC = 1
# ====== 行番号とフォルダ名の対応 (1始まり) ======
mapping = {
1: "x",
2: "y",
# 3: "z",
# 5: "w_quat",
# 6: "z_quat"
}
def iter_projects(root: Path):
for proj in root.iterdir():
if not proj.is_dir():
continue
data_dir = proj
cap_dir = proj / "capture"
if data_dir.is_dir() and cap_dir.is_dir():
csvs = sorted(data_dir.glob("*.csv"))
mp4s = sorted(cap_dir.glob("*.mp4"))
if csvs and mp4s:
yield proj, csvs, mp4s
def save_frame_range_sec(video_path, start_sec, stop_sec, step_sec,
dir_path, basename, ext='jpeg'):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ Could not open {video_path}")
return
# 書き過ぎ防止(例:50枚を超えていたらスキップ)
file_count = len([f for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path,f))])
if file_count > 50:
print(f"⚠ Skip {dir_path}: already has {file_count} files (>50).")
cap.release()
return
#os.makedirs(dir_path, exist_ok=True)
basename = basename.replace("/", "_").replace("\\", "_")
print("basename:" , basename)
base_path = os.path.join(dir_path, basename)
print("base_path:", base_path)
digit = len(str(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))))
fps = cap.get(cv2.CAP_PROP_FPS)
fps_inv = 1 / fps
sec = start_sec
while sec < stop_sec:
n = round(fps * sec)
cap.set(cv2.CAP_PROP_POS_FRAMES, n)
ret, frame = cap.read()
if ret:
cv2.imwrite(
'{}_{}_{:.2f}.{}'.format(
base_path, str(n).zfill(digit), n * fps_inv, ext
),
frame
)
else:
break
sec += step_sec
cap.release()
def create_folders(CSV_PATH, VIDEO_PATH):
# 出力基準ディレクトリを作成
os.makedirs(OUT_DIR, exist_ok=True)
df = pd.read_csv(CSV_PATH, skiprows = 1, header=None)
print(df)
print(len(df))
# マッピングに基づいてフォルダ作成
for row_num, folder_name in mapping.items():
print(folder_name)
start = 0
#print(start)
for row in df.itertuples(index=False):
x, y, z, z_quat, w_quat = round(row[0]), round(row[1]), round(row[2], 2), round(row[5], 2), round(row[6],2)
x, y, z, z_quat, w_quat = str(x), str(y), str(z), str(z_quat), str(w_quat)
#print(x, y, z, z_quat, w_quat)
if 1 <= row_num <= len(df):
if folder_name == "x":
path = os.path.join(OUT_DIR + '/' + folder_name + '/', x)
os.makedirs(path, exist_ok=True)
print(f"✅ 作成しました: {path}")
save_frame_range_sec(VIDEO_PATH, start, start + 1, 1, path, path)
elif folder_name == "y":
path = os.path.join(OUT_DIR + '/' + folder_name + '/', y)
os.makedirs(path, exist_ok=True)
print(f"✅ 作成しました: {path}")
save_frame_range_sec(VIDEO_PATH, start, start + 1, 1, path, path)
elif folder_name == "z":
path = os.path.join(OUT_DIR + '/' + folder_name + '/', z)
os.makedirs(path, exist_ok=True)
save_frame_range_sec(VIDEO_PATH, start, start + 1, 1, path, path)
print(f"✅ 作成しました: {path}")
elif folder_name == "z_quat":
path = os.path.join(OUT_DIR + '/' + folder_name + '/', z_quat)
os.makedirs(path, exist_ok=True)
print(f"✅ 作成しました: {path}")
save_frame_range_sec(VIDEO_PATH, start, start + 1, 1, path, path)
else:
path = os.path.join(OUT_DIR + '/' + folder_name + '/', w_quat)
os.makedirs(path, exist_ok=True)
print(f"✅ 作成しました: {path}")
save_frame_range_sec(VIDEO_PATH, start, start + 1, 1, path, path)
else:
print(f"⚠ 行 {row_num} はCSVに存在しません。スキップしました。")
start += 1
print(start)
def main():
any_found = False
for proj, csvs, mp4s in iter_projects(ROOT):
any_found = True
print(f"[{proj.name}] CSVs: {len(csvs)}, MP4s: {len(mp4s)}")
for csv_path, mp4_path in zip(csvs, mp4s):
#df = pd.read_csv(csv_path, header=None) # add skiprows=1 if needed
# do your processing with df & mp4_path here
print("mp4 path:", mp4_path)
create_folders(csv_path, mp4_path)
if not any_found:
print(f"No projects found under {ROOT} with xy_log_*.csv")
if __name__ == "__main__":
main()
上記コードを起動することで仕分けが完了する。
あとは以下記事に沿って仕分けされたデータをnpyファイルに変換すれば機械学習を回すための学習データとラベルデータの作成は完了