この記事でできること
- 設定モードでROI指定&OK/NG撮影保存 → 学習(Keras→TFLite)
- 出力モードで常時推論 → 画面にOK/NG表示 → PLCへOK=1/NG=0送信
依存:opencv-python
, tensorflow
(or tflite-runtime
), pymodbus
, Pillow
, numpy
, tkinter
プロンプト
OKとNGの画像をフォルダに入れて、tfliteで学習するプログラムをraspi用に作って
出力モードと設定モードをGUIで選択できるようにして
設定モード中は、
Okとng の画像を取得でき、OK/ng画像はROIで設定した位置をくり抜くように。
そして学習ボタンを押したらmodelを出力するように。
出力モード中は画像を取得し、判定結果の出力をwhileで常時出力して
また、Okを1、NGを0でPLCに出力して
ChatGPTで出力した実際のコード全文
下記のようなコードを出力してくれる。これを.pyとしてPCに保存し,scp コマンドでPCからRaspiにファイル転送すればOK。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# OK/NG 2クラス分類をRaspberry Piで
# - 設定モード:ROI指定、OK/NG撮影保存、学習(Keras→TFLite)
# - 出力モード:常時推論、画面表示、PLCへOK=1/NG=0出力(TCP or Modbus/TCP)
# インデントはスペース2
import os
import cv2
import json
import time
import socket
import queue
import threading
import numpy as np
from datetime import datetime
from pathlib import Path
from PIL import Image, ImageTk
import tkinter as tk
from tkinter import ttk, messagebox
# TensorFlow / TFLite
USE_TFLITE_RUNTIME = False
try:
import tflite_runtime.interpreter as tflite
USE_TFLITE_RUNTIME = True
except Exception:
from tensorflow.lite import Interpreter # fallback
tflite = None
# 学習用(Keras)
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# Modbus (任意)
try:
from pymodbus.client import ModbusTcpClient
HAVE_PYMODBUS = True
except Exception:
HAVE_PYMODBUS = False
# -------------------------------
# 定数・パス
# -------------------------------
APP_TITLE = "OK/NG TFLite Trainer & Runner (Raspberry Pi)"
DATA_DIR = Path("data")
OK_DIR = DATA_DIR / "OK"
NG_DIR = DATA_DIR / "NG"
MODEL_PATH = Path("model.tflite")
LABELS_PATH = Path("labels.json")
CONFIG_PATH = Path("config.json")
# 画像前処理
IMG_SIZE = 128
CHANNELS = 1 # グレースケール
BATCH_SIZE = 16
EPOCHS = 10
# 推論表示・出力
INFER_INTERVAL_SEC = 0.05
# -------------------------------
# ユーティリティ
# -------------------------------
def ensure_dirs():
for p in [DATA_DIR, OK_DIR, NG_DIR]:
p.mkdir(parents=True, exist_ok=True)
def now_str():
return datetime.now().strftime("%Y%m%d_%H%M%S_%f")
def load_config():
if CONFIG_PATH.exists():
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
# 既定値
return {
"camera_index": 0,
"roi": None, # [x, y, w, h]
"plc_mode": "none", # "none" | "tcp" | "modbus"
"plc_tcp": {"ip":"192.168.0.10","port":8501},
"plc_modbus": {"ip":"192.168.0.10","port":502,"unit_id":1,"coil_addr":0},
"ok_index": 1, # ラベルindex: OK=1
"ng_index": 0 # ラベルindex: NG=0
}
def save_config(cfg):
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(cfg, f, ensure_ascii=False, indent=2)
def crop_roi(frame, roi):
if roi is None:
return frame
x, y, w, h = roi
h_img, w_img = frame.shape[:2]
x2 = max(0, min(w_img, x + w))
y2 = max(0, min(h_img, y + h))
x1 = max(0, min(w_img, x))
y1 = max(0, min(h_img, y))
if x2 <= x1 or y2 <= y1:
return frame
return frame[y1:y2, x1:x2]
def preprocess(img_bgr):
# ROI後のBGR→Gray→resize→[0,1]
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
resized = cv2.resize(gray, (IMG_SIZE, IMG_SIZE), interpolation=cv2.INTER_AREA)
norm = resized.astype(np.float32) / 255.0
# (H,W,1)
return np.expand_dims(norm, axis=-1)
def to_tk_image(frame_bgr):
rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
pil = Image.fromarray(rgb)
return ImageTk.PhotoImage(image=pil)
# -------------------------------
# PLC 出力
# -------------------------------
class PLCOutput:
def __init__(self, cfg_getter):
self.cfg_getter = cfg_getter
self.modbus_client = None
def send_value(self, val: int):
cfg = self.cfg_getter()
mode = cfg.get("plc_mode", "none")
if mode == "none":
return True
if mode == "tcp":
ip = cfg["plc_tcp"]["ip"]
port = int(cfg["plc_tcp"]["port"])
try:
with socket.create_connection((ip, port), timeout=1.0) as s:
s.sendall(str(val).encode("ascii"))
return True
except Exception as e:
print(f"[PLC TCP] send error: {e}")
return False
if mode == "modbus":
if not HAVE_PYMODBUS:
print("[PLC Modbus] pymodbus not installed.")
return False
ip = cfg["plc_modbus"]["ip"]
port = int(cfg["plc_modbus"]["port"])
unit_id = int(cfg["plc_modbus"]["unit_id"])
coil = int(cfg["plc_modbus"]["coil_addr"])
try:
if self.modbus_client is None:
self.modbus_client = ModbusTcpClient(host=ip, port=port)
self.modbus_client.connect()
# True/False でcoil書き込み
rr = self.modbus_client.write_coil(coil, bool(val), unit=unit_id)
if rr.isError():
print(f"[PLC Modbus] write error: {rr}")
return False
return True
except Exception as e:
print(f"[PLC Modbus] send error: {e}")
return False
return False
# -------------------------------
# 学習(Keras→TFLite)
# -------------------------------
def load_dataset():
xs = []
ys = []
# NG=0, OK=1 固定(UIもこれに合わせる)
for label_name, label_idx, d in [("NG", 0, NG_DIR), ("OK", 1, OK_DIR)]:
for p in d.glob("*.jpg"):
img = cv2.imread(str(p))
if img is None:
continue
arr = preprocess(img)
xs.append(arr)
ys.append(label_idx)
for p in d.glob("*.png"):
img = cv2.imread(str(p))
if img is None:
continue
arr = preprocess(img)
xs.append(arr)
ys.append(label_idx)
if not xs:
return None, None
X = np.stack(xs, axis=0)
y = np.array(ys, dtype=np.int32)
return X, y
def build_model():
inputs = keras.Input(shape=(IMG_SIZE, IMG_SIZE, CHANNELS))
x = layers.Conv2D(16, 3, activation="relu")(inputs)
x = layers.MaxPooling2D()(x)
x = layers.Conv2D(32, 3, activation="relu")(x)
x = layers.MaxPooling2D()(x)
x = layers.Conv2D(64, 3, activation="relu")(x)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(32, activation="relu")(x)
outputs = layers.Dense(2, activation="softmax")(x) # [NG, OK] = [0,1]
model = keras.Model(inputs, outputs)
model.compile(optimizer="adam",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"])
return model
def train_and_export_tflite(status_cb=lambda s: None):
X, y = load_dataset()
if X is None:
raise RuntimeError("データがありません。OK/NG画像を保存してください。")
# シャッフル & 分割(80/20)
rng = np.random.default_rng(42)
idx = np.arange(len(X))
rng.shuffle(idx)
X = X[idx]; y = y[idx]
n_train = max(1, int(len(X) * 0.8))
Xtr, ytr = X[:n_train], y[:n_train]
Xva, yva = X[n_train:], y[n_train:] if len(X) > n_train else (X[:], y[:])
# 学習
status_cb(f"学習開始: {Xtr.shape[0]}枚(検証: {Xva.shape[0]}枚)")
model = build_model()
hist = model.fit(
Xtr, ytr,
validation_data=(Xva, yva),
epochs=EPOCHS,
batch_size=BATCH_SIZE,
verbose=2
)
# TFLite変換
status_cb("TFLiteへ変換中…")
converter = tf.lite.TFLiteConverter.from_keras_model(model)
# 軽量化(任意)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
with open(MODEL_PATH, "wb") as f:
f.write(tflite_model)
# ラベル保存(index: 0=NG, 1=OK)
with open(LABELS_PATH, "w", encoding="utf-8") as f:
json.dump({"0":"NG","1":"OK"}, f, ensure_ascii=False, indent=2)
# 最終精度
val_acc = hist.history.get("val_accuracy", [None])[-1]
status_cb(f"完了: model.tflite 出力(val_acc={val_acc:.3f})")
# -------------------------------
# 推論(TFLite)
# -------------------------------
class TFLiteRunner:
def __init__(self):
self.interpreter = None
self.input_details = None
self.output_details = None
def load(self, model_path=MODEL_PATH):
if not Path(model_path).exists():
raise RuntimeError("model.tflite がありません。学習してください。")
if USE_TFLITE_RUNTIME:
self.interpreter = tflite.Interpreter(model_path=str(model_path))
else:
self.interpreter = Interpreter(model_path=str(model_path))
self.interpreter.allocate_tensors()
self.input_details = self.interpreter.get_input_details()
self.output_details = self.interpreter.get_output_details()
def infer(self, img_bgr, roi):
if self.interpreter is None:
self.load(MODEL_PATH)
crop = crop_roi(img_bgr, roi)
arr = preprocess(crop)
# 入力shapeに合わせる
x = np.expand_dims(arr, axis=0) # (1,H,W,1)
inp = self.input_details[0]
if inp["dtype"] == np.float32:
x = x.astype(np.float32)
else:
# uint8量子化などのケース(簡易対応:0-255へ拡張)
x = (x * 255).astype(np.uint8)
self.interpreter.set_tensor(inp["index"], x)
self.interpreter.invoke()
out = self.interpreter.get_tensor(self.output_details[0]["index"])
# softmax想定: shape (1,2) = [NG, OK]
probs = out[0]
ng_prob = float(probs[0])
ok_prob = float(probs[1])
pred_idx = int(np.argmax(probs))
return pred_idx, ok_prob, ng_prob, crop
# -------------------------------
# GUIアプリ
# -------------------------------
class App:
def __init__(self, root):
self.root = root
self.root.title(APP_TITLE)
ensure_dirs()
self.cfg = load_config()
# カメラ
self.cap = cv2.VideoCapture(self.cfg.get("camera_index", 0))
if not self.cap.isOpened():
messagebox.showerror("Camera", "カメラをオープンできませんでした。camera_indexを確認してください。")
# 画像描画キャンバス
self.canvas = tk.Canvas(root, width=640, height=480, bg="black")
self.canvas.grid(row=0, column=0, rowspan=20, sticky="nsew", padx=4, pady=4)
# ROI関連
self.roi = self.cfg.get("roi", None) # [x,y,w,h]
self.dragging = False
self.drag_start = None
self.canvas.bind("<ButtonPress-1>", self.on_mouse_down)
self.canvas.bind("<B1-Motion>", self.on_mouse_drag)
self.canvas.bind("<ButtonRelease-1>", self.on_mouse_up)
# 右パネル
panel = ttk.Frame(root)
panel.grid(row=0, column=1, sticky="ns", padx=6, pady=6)
# モード選択
self.mode_var = tk.StringVar(value="setting")
ttk.Label(panel, text="モード").grid(row=0, column=0, sticky="w")
ttk.Radiobutton(panel, text="設定モード", variable=self.mode_var, value="setting",
command=self.on_mode_change).grid(row=1, column=0, sticky="w")
ttk.Radiobutton(panel, text="出力モード", variable=self.mode_var, value="output",
command=self.on_mode_change).grid(row=2, column=0, sticky="w")
ttk.Separator(panel).grid(row=3, column=0, sticky="ew", pady=4)
# 保存ボタン
ttk.Label(panel, text="撮影/保存(ROI切抜)").grid(row=4, column=0, sticky="w", pady=(6,2))
ttk.Button(panel, text="OKを保存", command=self.save_ok).grid(row=5, column=0, sticky="ew")
ttk.Button(panel, text="NGを保存", command=self.save_ng).grid(row=6, column=0, sticky="ew")
ttk.Separator(panel).grid(row=7, column=0, sticky="ew", pady=4)
# 学習
ttk.Label(panel, text="学習(Keras→TFLite)").grid(row=8, column=0, sticky="w", pady=(6,2))
ttk.Button(panel, text="学習を実行", command=self.train_model_thread).grid(row=9, column=0, sticky="ew")
ttk.Button(panel, text="モデルを読み込み", command=self.load_model).grid(row=10, column=0, sticky="ew")
ttk.Separator(panel).grid(row=11, column=0, sticky="ew", pady=4)
# PLC設定
ttk.Label(panel, text="PLC 出力設定").grid(row=12, column=0, sticky="w", pady=(6,2))
self.plc_mode = tk.StringVar(value=self.cfg.get("plc_mode","none"))
ttk.Radiobutton(panel, text="なし", variable=self.plc_mode, value="none").grid(row=13, column=0, sticky="w")
ttk.Radiobutton(panel, text="TCP(ASCII 1/0)", variable=self.plc_mode, value="tcp").grid(row=14, column=0, sticky="w")
ttk.Radiobutton(panel, text="Modbus/TCP (coil)", variable=self.plc_mode, value="modbus").grid(row=15, column=0, sticky="w")
# TCP設定
tcpf = ttk.LabelFrame(panel, text="TCP設定")
tcpf.grid(row=16, column=0, sticky="ew", pady=4)
self.tcp_ip = tk.StringVar(value=str(self.cfg["plc_tcp"]["ip"]))
self.tcp_port = tk.StringVar(value=str(self.cfg["plc_tcp"]["port"]))
ttk.Label(tcpf, text="IP").grid(row=0, column=0, sticky="w")
ttk.Entry(tcpf, textvariable=self.tcp_ip).grid(row=0, column=1, sticky="ew")
ttk.Label(tcpf, text="Port").grid(row=1, column=0, sticky="w")
ttk.Entry(tcpf, textvariable=self.tcp_port).grid(row=1, column=1, sticky="ew")
# Modbus設定
modf = ttk.LabelFrame(panel, text="Modbus/TCP設定")
modf.grid(row=17, column=0, sticky="ew", pady=4)
self.mb_ip = tk.StringVar(value=str(self.cfg["plc_modbus"]["ip"]))
self.mb_port = tk.StringVar(value=str(self.cfg["plc_modbus"]["port"]))
self.mb_uid = tk.StringVar(value=str(self.cfg["plc_modbus"]["unit_id"]))
self.mb_coil = tk.StringVar(value=str(self.cfg["plc_modbus"]["coil_addr"]))
ttk.Label(modf, text="IP").grid(row=0, column=0, sticky="w")
ttk.Entry(modf, textvariable=self.mb_ip).grid(row=0, column=1, sticky="ew")
ttk.Label(modf, text="Port").grid(row=1, column=0, sticky="w")
ttk.Entry(modf, textvariable=self.mb_port).grid(row=1, column=1, sticky="ew")
ttk.Label(modf, text="UnitID").grid(row=2, column=0, sticky="w")
ttk.Entry(modf, textvariable=self.mb_uid).grid(row=2, column=1, sticky="ew")
ttk.Label(modf, text="CoilAddr").grid(row=3, column=0, sticky="w")
ttk.Entry(modf, textvariable=self.mb_coil).grid(row=3, column=1, sticky="ew")
ttk.Separator(panel).grid(row=18, column=0, sticky="ew", pady=4)
# 出力モード制御
ttk.Label(panel, text="出力モード制御").grid(row=19, column=0, sticky="w", pady=(6,2))
self.btn_run = ttk.Button(panel, text="推論開始", command=self.toggle_run)
self.btn_run.grid(row=20, column=0, sticky="ew")
# ステータス
self.status = tk.StringVar(value="準備OK")
ttk.Label(panel, textvariable=self.status, foreground="#333").grid(row=21, column=0, sticky="w", pady=(6,2))
# 画像更新
self.frame = None
self.preview_running = True
self.infer_running = False
self.after_id = None
# 推論関連
self.runner = TFLiteRunner()
self.last_pred = None
self.okng_text_id = None
# PLC
self.plc = PLCOutput(self.get_plc_config)
# キャンバスの拡張
root.columnconfigure(0, weight=1)
root.rowconfigure(0, weight=1)
# ループ開始
self.update_loop()
# 終了処理
root.protocol("WM_DELETE_WINDOW", self.on_close)
# ----------------- 設定保存/取得 -----------------
def get_plc_config(self):
# UIからcfgへ反映
self.cfg["plc_mode"] = self.plc_mode.get()
self.cfg["plc_tcp"]["ip"] = self.tcp_ip.get()
self.cfg["plc_tcp"]["port"] = int(self.tcp_port.get())
self.cfg["plc_modbus"]["ip"] = self.mb_ip.get()
self.cfg["plc_modbus"]["port"] = int(self.mb_port.get())
self.cfg["plc_modbus"]["unit_id"] = int(self.mb_uid.get())
self.cfg["plc_modbus"]["coil_addr"] = int(self.mb_coil.get())
return self.cfg
# ----------------- ROI操作 -----------------
def on_mouse_down(self, event):
if self.mode_var.get() != "setting":
return
self.dragging = True
self.drag_start = (event.x, event.y)
def on_mouse_drag(self, event):
if not self.dragging:
return
x0, y0 = self.drag_start
x1, y1 = event.x, event.y
# キャンバスサイズ→実画面座標へのマッピングが必要
# 今回はキャンバスにフレームをそのまま640x480で描画する前提
x = min(x0, x1); y = min(y0, y1)
w = abs(x1 - x0); h = abs(y1 - y0)
self.roi = [x, y, w, h]
def on_mouse_up(self, event):
if not self.dragging:
return
self.dragging = False
self.save_current_roi()
def save_current_roi(self):
# ROIを保存(キャンバス座標→フレーム座標が同じ倍率ならそのまま)
if self.roi is not None:
self.cfg["roi"] = self.roi
save_config(self.cfg)
self.set_status(f"ROI保存: {self.roi}")
# ----------------- 画像保存 -----------------
def capture_frame(self):
ret, frame = self.cap.read()
if not ret or frame is None:
return None
# 640x480へ(描画とROI座標合わせ)
frame = cv2.resize(frame, (640, 480), interpolation=cv2.INTER_AREA)
return frame
def save_ok(self):
self.save_sample(is_ok=True)
def save_ng(self):
self.save_sample(is_ok=False)
def save_sample(self, is_ok: bool):
if self.mode_var.get() != "setting":
messagebox.showwarning("保存", "設定モードで実行してください。")
return
frame = self.frame if self.frame is not None else self.capture_frame()
if frame is None:
messagebox.showerror("保存", "フレームが取得できませんでした。")
return
crop = crop_roi(frame, self.roi)
fn = f"{now_str()}.jpg"
out_dir = OK_DIR if is_ok else NG_DIR
cv2.imwrite(str(out_dir / fn), crop)
self.set_status(f"{'OK' if is_ok else 'NG'} 保存: {fn}")
# ----------------- 学習 -----------------
def train_model_thread(self):
if self.mode_var.get() != "setting":
messagebox.showwarning("学習", "設定モードで実行してください。")
return
th = threading.Thread(target=self._train_model_impl, daemon=True)
th.start()
def _train_model_impl(self):
try:
train_and_export_tflite(self.set_status)
self.set_status("学習完了: model.tflite を出力しました。")
except Exception as e:
messagebox.showerror("学習エラー", str(e))
self.set_status(f"学習エラー: {e}")
def load_model(self):
try:
self.runner.load(MODEL_PATH)
self.set_status("モデル読み込みOK")
except Exception as e:
messagebox.showerror("モデル読み込み", str(e))
# ----------------- 出力モード -----------------
def on_mode_change(self):
mode = self.mode_var.get()
if mode == "output":
self.set_status("出力モード:推論を開始するには『推論開始』を押してください。")
else:
self.stop_infer()
self.set_status("設定モード:ROI指定や撮影保存、学習ができます。")
def toggle_run(self):
if self.infer_running:
self.stop_infer()
else:
self.start_infer()
def start_infer(self):
# モデルロード
try:
self.runner.load(MODEL_PATH)
except Exception as e:
messagebox.showerror("推論開始", f"モデルがありません。学習してください。\n{e}")
return
self.infer_running = True
self.btn_run.config(text="停止")
self.set_status("推論ループ開始")
th = threading.Thread(target=self.infer_loop, daemon=True)
th.start()
def stop_infer(self):
self.infer_running = False
self.btn_run.config(text="推論開始")
self.set_status("推論停止")
def infer_loop(self):
while self.infer_running:
frame = self.capture_frame()
if frame is None:
time.sleep(0.1)
continue
try:
pred_idx, ok_prob, ng_prob, crop = self.runner.infer(frame, self.roi)
# 画面にOK/NG描画、PLC出力
# pred_idx: 0=NG,1=OK
value = 1 if pred_idx == 1 else 0
self.last_pred = (value, ok_prob, ng_prob)
# PLC送信(非同期にしても良いが簡易化)
self.plc.send_value(value)
except Exception as e:
self.set_status(f"推論エラー: {e}")
time.sleep(INFER_INTERVAL_SEC)
# ----------------- 画面更新 -----------------
def update_loop(self):
# プレビュー更新
if self.preview_running:
frame = self.capture_frame()
if frame is not None:
self.frame = frame.copy()
# ROI矩形
if self.roi is not None:
x, y, w, h = self.roi
cv2.rectangle(frame, (x,y), (x+w, y+h), (0,255,0), 2)
# 推論結果オーバレイ
if self.last_pred is not None and self.mode_var.get() == "output":
value, okp, ngp = self.last_pred
label = f"{'OK' if value==1 else 'NG'} (OK:{okp:.2f} NG:{ngp:.2f})"
color = (0,200,0) if value==1 else (0,0,220)
cv2.putText(frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2, cv2.LINE_AA)
img = to_tk_image(frame)
self.canvas.create_image(0, 0, anchor="nw", image=img)
self.canvas.image = img
self.after_id = self.root.after(30, self.update_loop)
def set_status(self, s):
print(s)
self.status.set(s)
def on_close(self):
try:
self.preview_running = False
self.infer_running = False
if self.after_id is not None:
self.root.after_cancel(self.after_id)
if self.cap is not None and self.cap.isOpened():
self.cap.release()
except Exception:
pass
save_config(self.get_plc_config())
self.root.destroy()
# -------------------------------
# main
# -------------------------------
def main():
ensure_dirs()
root = tk.Tk()
app = App(root)
root.mainloop()
if __name__ == "__main__":
main()
まとめ
- プロンプトを投げるだけで、学習〜推論〜PLC連携まで網羅したラズパイ用アプリが生成される
- エラーが出れば,ChatGPTに聞けばOK。システム製作から保守までプログラムの知識は必要なし。