3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

#レーザポインタを追いかけるカメラ

Last updated at Posted at 2020-06-07

#はじめに
機械学習により学習したレーザポインタの輝点をXavierNXを用いて自動追尾するカメラをつくりました。
前回まででレーザポインタの輝点の位置は取得できたので、ステッピングモータのコントロールと合わせ、最終的に輝点を自動追尾します。
一連のエントリはこちら
1,USB-IOのコントロール
2,USB-IO経由でステッピングモータのコントロール
3,レーザポインタを物体検出するための学習データの作成
3.5. Google Colabを使って学習
4,レーザポインタを追いかけるカメラ −−いまここ

#やったこと
2,USB-IO経由でステッピングモータのコントロールでステッピングモータを2個コントロールし、カメラの方向制御ができているので、これと3,レーザポインタを物体検出するための学習データの作成で学習したデータを合わせ、レーザポインタの輝点を追いかけるようにしました。

#スクリプト
Darknetのサンプルのdarknet_video.pyにモータ制御をくっつけただけです。

dvl.py
from ctypes import *
import math
import random
import os
import cv2
import numpy as np
import time
import darknet
import StepMotor
import threading


StepMotorY = StepMotor.C28BYJ48(IN1=0 ,IN2=1, IN3=2, IN4=3)
StepMotorX = StepMotor.C28BYJ48(IN1=4, IN2=5, IN3=6, IN4=7)


def convertBack(x, y, w, h):
    xmin = int(round(x - (w / 2)))
    xmax = int(round(x + (w / 2)))
    ymin = int(round(y - (h / 2)))
    ymax = int(round(y + (h / 2)))
    return xmin, ymin, xmax, ymax


def cvDrawBoxes(detections, img):
    global StepMotorX , StepMotorY
    found=0
    for detection in detections:
        x, y, w, h = detection[2][0],\
            detection[2][1],\
            detection[2][2],\
            detection[2][3]
        xmin, ymin, xmax, ymax = convertBack(
            float(x), float(y), float(w), float(h))
        pt1 = (xmin, ymin)
        pt2 = (xmax, ymax)
        cv2.rectangle(img, pt1, pt2, (0, 255, 0), 1)
        cv2.putText(img,
                    detection[0].decode() +
                    " [" + str(round(detection[1] * 100, 2)) + "]",
                    (pt1[0], pt1[1] - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                    [0, 255, 0], 2)
        if ( detection[0]==b'laser' ) :
            print( x , y )
            if x > 304+20 :
                StepMotorX.ThreadStep( -5 , 0.001)
            if x > 304+60 :
                StepMotorX.ThreadStep( -20 , 0.001)
            if x < 304-20 :
                StepMotorX.ThreadStep( +5 , 0.001)
            if x < 304-60 :
                StepMotorX.ThreadStep( +20 , 0.001)
            if y > 304+20 :
                StepMotorY.ThreadStep( +5 , 0.001)
            if y > 304+60 :
                StepMotorY.ThreadStep( +20 , 0.001)
            if y < 304-20 :
                StepMotorY.ThreadStep( -5 , 0.001)
            if y < 304-60 :
                StepMotorY.ThreadStep( -20 , 0.001)
            found=found+1

    return img,found


netMain = None
metaMain = None
altNames = None


def YOLO():

    global metaMain, netMain, altNames
    global StepMotorX , StepMotorY



    configPath = "./yolov3-voc.cfg"
    weightPath = "./yolov3-voc_2000.weights"
    metaPath = "./voc.data"
    if not os.path.exists(configPath):
        raise ValueError("Invalid config path `" +
                         os.path.abspath(configPath)+"`")
    if not os.path.exists(weightPath):
        raise ValueError("Invalid weight path `" +
                         os.path.abspath(weightPath)+"`")
    if not os.path.exists(metaPath):
        raise ValueError("Invalid data file path `" +
                         os.path.abspath(metaPath)+"`")
    if netMain is None:
        netMain = darknet.load_net_custom(configPath.encode(
            "ascii"), weightPath.encode("ascii"), 0, 1)  # batch size = 1
    if metaMain is None:
        metaMain = darknet.load_meta(metaPath.encode("ascii"))
    if altNames is None:
        try:
            with open(metaPath) as metaFH:
                metaContents = metaFH.read()
                import re
                match = re.search("names *= *(.*)$", metaContents,
                                  re.IGNORECASE | re.MULTILINE)
                if match:
                    result = match.group(1)
                else:
                    result = None
                try:
                    if os.path.exists(result):
                        with open(result) as namesFH:
                            namesList = namesFH.read().strip().split("\n")
                            altNames = [x.strip() for x in namesList]
                except TypeError:
                    pass
        except Exception:
            pass

    src = 'v4l2src device= /dev/video0 ! image/jpeg,width=1280,height=720 !jpegdec !videoconvert ! appsink'
    StepMotorX.ThreadWait()
    StepMotorY.ThreadWait()
    cap = cv2.VideoCapture(src)
#    cap = cv2.VideoCapture("output.avi")
#    cap.set(3, 1280)
#    cap.set(4, 720)
    out = cv2.VideoWriter(
        "laser.mp4", cv2.VideoWriter_fourcc(*"mp4v"), 10.0,
        (darknet.network_width(netMain), darknet.network_height(netMain)))
    print("Starting the YOLO loop...")
    # Create an image we reuse for each detect
    darknet_image = darknet.make_image(darknet.network_width(netMain),
                                    darknet.network_height(netMain),3)
    fpsgt=0
    frameTotal=0
    foundTotal=0
    while True:
        prev_time = time.time()
     #   ret, frame_read = cap.read()
        ret, frame_read = cap.read()
        ret, frame_read = cap.read()

        frame_rgb = cv2.cvtColor(frame_read, cv2.COLOR_BGR2RGB)
        frame_resized = cv2.resize(frame_rgb,
                                   (darknet.network_width(netMain),
                                    darknet.network_height(netMain)),
                                   interpolation=cv2.INTER_LINEAR)

        darknet.copy_image_from_bytes(darknet_image,frame_resized.tobytes())

        detections = darknet.detect_image(netMain, metaMain, darknet_image, thresh=0.70)
        image ,found= cvDrawBoxes(detections, frame_resized)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        fps = 1/(time.time()-prev_time)
        fpsgt = fpsgt + fps
        frameTotal=frameTotal+1
        foundTotal=foundTotal+found
 #       print( "AVG FPS="+str(fpsgt / frameTotal) + "  FRAME="+str(frameTotal)+"  BIRD="+str(foundTotal))
        cv2.imshow('Demo', image)
        out.write( image )
        cv2.waitKey(3)
    cap.release()
    out.release()

if __name__ == "__main__":
    YOLO()

StepMotor.py
import time
import hid
import threading


class USBIO:
    VENDOR_ID = 0x1352       # Km2Net
    PRODUCT_ID = 0x0121  # USB-IO2.0(AKI)
    lock = threading.Lock()
# Command list
    CMD_READ_WRITE = 0x20
    CMD_CONFIG_READ = 0xf8
    CMD_CONFIG_WRITE = 0xf9
    usb = hid.device(VENDOR_ID, PRODUCT_ID)
    def __init__(self  ):
        #writePin( 0,0 )
        self.pin = [0] *12

    def writePin(self,p1,p2):   # Prioro to use this , all pins need to be configured as output pins
        data = [int(0)] * 64
        data[0] = self.CMD_READ_WRITE
        data[1] = 1
        data[2] = p1
        data[3] = 2
        data[4] = p2
        data[63] =89    # dummy to confirm USB-IO recognition
        self.lock.acquire() # make sure just a thread using USB
        self.usb.write(data)
        self.lock.release()
        return(self.usb.read(64))


    def setPinLevel(self, pin_no , level ):
        self.pin[pin_no] = level

    def outputToPin( self ):
        p1 =      self.pin[0]    + self.pin[1] * 2 + self.pin[2] * 4 + self.pin[3] *8
        p1 = p1 + self.pin[4]*16 + self.pin[5] * 32+ self.pin[6] * 64+ self.pin[7] *128
        p2 =      self.pin[8]    + self.pin[9] * 2 + self.pin[10]* 4 + self.pin[11]*8
        self.writePin( p1 , p2 )


UsbIO=USBIO(  )   # To use USBIO as a kind of singleton


class C28BYJ48():
    def __init__(self, IN1, IN2, IN3, IN4):
        self.mPin = [IN1, IN2, IN3, IN4]     # USBIO pin number 4 of 0-12
        self.stepThread=None

        #Setting related Sequence
        #1step angle = 1/4096[deg]
        self.nPos = 0
        self.mSeq = [[1,0,0,1],[1,0,0,0],[1,1,0,0],[0,1,0,0],[0,1,1,0],[0,0,1,0],[0,0,1,1],[0,0,0,1]]
        #default speed = max speed
        self.SetWaitTime(0.001)


    def SetPinsVoltage(self, nSeq):
        for pin in range(0, 4):
            if self.mSeq[nSeq][pin]!=0:
                UsbIO.setPinLevel( self.mPin[pin] , 1 )
            else:
                UsbIO.setPinLevel( self.mPin[pin] , 0 )

    def SetWaitTime(self, wait):
        self.mStep_wait = wait

    def Step(self, step, wait):
        self.SetWaitTime(wait)
        for i in range(0, abs(step) ):
            if( step > 0 ):
                self.nPos += 1
            else :
                self.nPos -= 1
            self.SetPinsVoltage(self.nPos % 8)
            UsbIO.outputToPin()
            time.sleep(self.mStep_wait)

# Entry point to control the stepping motor.

    def ThreadStep( self , step , wait ):
        if self.stepThread!=None:
# When motor is moving , it wait the end of moving before creating thread.
            self.stepThread.join()

        self.stepThread = threading.Thread( target=self.Step, args=(step,wait ) )
        self.stepThread.start()

# This function was added from last entry 
    def ThreadWait(self ):
        if self.stepThread!=None:
# When motor is moving , it wait the end of moving before creating thread.
            self.stepThread.join()


    def Cleanup(self):
        if self.stepThread!=None:
# When motor is moving , it wait the end of moving before creating thread.
            self.stepThread.join()
        for pin in range(0, 4):
            UsbIO.setPinLevel( self.mPin[pin] , 0 )
        UsbIO.outputToPin()


if __name__ == '__main__':
    StepMoterX = C28BYJ48(IN1=0 ,IN2=1, IN3=2, IN4=3)
    StepMoterX.ThreadStep(-300,0.001)
    Sleep(5)
    StepMoterX.Cleanup()

#動作
動かして、レーザポインタをおっかけてる姿が以下。。
部屋が写るのを最小にするのに、いまいち面白いビデオではないですが。。。
白い箱の中身はUSB-IOとモータドライバ
JetsonXavierNXはUSBで大分はなれたところにおいてあります。
JetsonNanoだと、Yolov3だと、現状の簡単なクローズドループでの実装だと遅すぎて発振しちゃうかもしれません。

ezgif.com-video-to-gif(7).gif

カメラがみているのは以下の感じです。
ezgif.com-video-to-gif(8).gif

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?