4
1

More than 3 years have passed since last update.

PythonでTELLO-EDUの操作

Posted at

環境

機器・ソフト バージョン
ノートPC Windows10 Home
Python 3.7.0
opencv-python 3.4.2.17

自作モジュール解説

自作モジュールはGitHubにおいてあります。

各種セットアップ

セットアップはsetup_tello()で行っています。__init__()でもよかったのですが……

SDKコマンドやりとり

TelloへのSDKコマンドの送信またそれに対する応答は113行目にあるgo_tello()で制御しています。
まずgo_tello()に引数でコマンドを受け取りコマンドをsock.sendto()で送ります。その後154行目のwhileループに突入し、self.RECIEVE_DATANoneの間すなわち、ドローンから応答が返って来るまで処理待ち状態にします。ドローンからの応答は、

実行成功: OK
実行失敗: error (エラー文が続く場合がある)

のような形式で返ってきます。
ドローンの応答が返ってきたら、クールダウンを行います。TELLO-EDUはコマンドとコマンドの間を短くするとerror not joystickなるエラーが返ってきます。なのでthreading.Timer()を用いて処理待ち状態にします。ちなみにtime.sleep()だとなぜかうまくいきませんでした。

    def __receive_thread__(self):
        """
        Telloからの応答監視スレッド
        最後の応答がreceive_dataに格納される
        """
        while self._receive_going:
            try:
                self.RECEIVE_DATA, ip = self.sock.recvfrom(3000)
            except:
                print(traceback.format_exc())
    def go_tello(self, S, interval=1.0):
        """
        S : コマンド(大文字小文字どちらでもいい)
        interval : 入力コマンド間の遅延

        command
        takeoff
        land
        up[down, left, right, forward, back] "Value" : "Value" = 20-500
        cw[ccw] "Value" : "Value" = 1-360
        flip "Direction" : "Direction" = l(left), r(right), f(forward), b(back)
        """

        # command, emergency は処理上ない
        _CONTROL_COMMANDS_ = ["takeoff", "land", "streamon", "streamoff", "up", "down", "left",
                              "right", "forward", "back", "cw", "ccw", "flip", "go", "stop",
                              "curve", "jump"]

        _SET_COMMANDS_ = ["speed", "rc", "wifi",
                          "mon", "moff", "mdirection", "ap"]

        _READ_COMMANDS_ = ["speed?", "battery?", "time?", "wifi?", "height?", "temp?", "attitude?"
                           "baro?", "acceleration?", "tof?"]

        _ALL_COMMANDS_ = _CONTROL_COMMANDS_ + _SET_COMMANDS_ + _READ_COMMANDS_

        if isinstance(S, str):
            S = S.lower()
            self.DATA_FLAG = False
            sent = self.sock.sendto(S.encode("utf-8"),
                                    self.tello_address)  # コマンド送信
            print(S, ">> Tello")

            # タイマースレッドで処理待ち状態に
            # time.sleepとかだとうまくいかない
            timer = threading.Timer(interval, self.set_flag_TRUE)
            timer.start()  # タイマースレッドスタート

            S = S.split()
            S = S[0]

            while self.RECEIVE_DATA is None:
                if S in _ALL_COMMANDS_:
                    continue
                if self.DATA_FLAG:
                    break
            timer.cancel()  # タイマースレッドストップ

            if self.RECEIVE_DATA is None:
                response = "NULL"
            else:
                response = self.RECEIVE_DATA.decode("utf-8")
            self.RECEIVE_DATA = None
            print(response + " << Tello")
        else:
            raise TypeError
        self.DATA_FLAG = False
        timer = threading.Timer(interval, self.set_flag_TRUE)
        timer.start()  # タイマースレッドスタート
        while True:
            if self.DATA_FLAG:
                break

カメラ制御

Telloのカメラ制御にはpythonのopencv-pythonモジュールを使用しました。(バージョンは最新のものではない)

カメラ制御の挙動としては、Telloが他のSDKコマンドを実行している最中でもPCでTelloの映像をリアルタイムで表示できるようにしました。再びthreadingを使用します。
VIDEO_FRAMEにフレームデータを格納し、それをcv2.imshow()で表示しています。cv2.waitKey()はキー入力を引数に与えた時間(単位はミリ秒)だけ待つ関数で一見必要ないように見えますが、これがないとcv2.imshow()ができないので入れています。

def __video_thread__(self):
    """
    ビデオスレッド
    カメラのフレームデータをself.VIDEO_FRAMEに格納する
    """
    self.VIDEO_FRAME = None
    self.rect = []
    while self._video_going:
        ret, self.VIDEO_FRAME = self.cap.read()
        if len(self.rect) == 1: # 顔が検出された場合、四角で囲む
            x, y, w, h = self.rect[0]
            cv2.rectangle(self.VIDEO_FRAME, tuple([x, y]), tuple([x + w, y + h]), (0, 0, 255), thickness=3)
        cv2.imshow("frame_data", self.VIDEO_FRAME)
        _ = cv2.waitKey(1)
    #cv2.destroyAllWindows()

またtake_picture()という写真撮影を行う関数も作成しました。引数filenameは撮影した画像データを保存するときのファイル名で、特に指定がなければframe_filenameが適当にナンバリングしてくれます。

def take_picture(self, filename=None):
    """
    Telloのカメラの最新フレームを保存する
    保存先はself.frame_save_path
    保存名はself.frame_filename
    """
    if filename is None:  # ユーザがファイル名を指定しなかった場合ファイル名をナンバリング
        filename = self.frame_filename
        self.frame_filename = str(int(self.frame_filename))
        self.frame_filename = str(eval(self.frame_filename + " + 1"))
        self.frame_filename = self.frame_filename.zfill(6)  # 整形のためゼロ埋め
        filename = self.frame_filename
    cv2.imwrite(self.frame_save_path + filename + ".jpg", self.VIDEO_FRAME)
    print(f"Saved the photo as filename:{filename} << Tello")

顔認識

顔認識にはOpenCVに付属していたカスケード分類器を使用しました。付属の分類器の顔認識の(大雑把な)精度は以下の通りでした。

分類器 精度
haarcascade_frontalcatface 良くなかった
haarcascade_frontalcatface_extended 良くなかった
haarcascade_frontalface_alt 多少認識する
haarcascade_frontalface_alt_tree 良くなかった
haarcascade_frontalface_alt2 良かった
haarcascade_frontalface_default 良かった

pytello.pyにある通り私が実装した際にはhaarcascade_frontalface_default.xmlを使用しました(alt2でもいい感じかも)。

顔認識は294行目のface_detection()にあります。スレッドで走らせている__video_thread__()VIDEO_FRAMEからopencvのフレームデータを読み取り、グレースケールにしてHaarCascadeで殴っています。

def face_detection(self):
    """
    現在のフレームデータ(self.VIDEO_FRAME)にて人の顔があるかどうか識別
    肝心な顔認識の部分はHaar Cascadeで殴ってる
    返し値は認識した顔の[(左上のx座標), (左上のy座標), (横幅), (縦線)]が、
    認識した顔の数だけlistで返される
    """
    self.rect = []
    cv2.imwrite("./_camera_frame/temp.jpg", self.VIDEO_FRAME)  # いったん空打
    src = cv2.imread("./_camera_frame/temp.jpg", 0)
    gray = cv2.cvtColor(src, cv2.cv2.COLOR_BAYER_BG2GRAY)
    cascade = cv2.CascadeClassifier(self.cascade_path)
    self.rect = cascade.detectMultiScale(gray, minSize=(100, 100))
    return self.rect

色認識

色認識はHSV閾値を用いて行いました。
color_detection()に引数upper, lowerとして閾値を入力します。引数を受け取った関数はVIDEO_FRAMEからフレームデータを取得しcv2.cvtColor()でHSVに変換して、指定色範囲以外をマスクします。マスク結果はfind_specific_color()に引数として渡されます。

find_specific_color()は特定の色の座標を返すプログラムで、TonyMoooriさんのGiuHubから使用しました。

def color_detection(self, lower, upper):
    """
    閾値に対して色検出を行いその座標を返す。
    lower, upperにそれぞれ[h, s, v]を指定してぶち込む。
    """
    cv2.imwrite("./_camera_frame/temp.jpg", self.VIDEO_FRAME)
    img = cv2.imread("./_camera_frame/temp.jpg")

    # HSVに変換
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)

    # numpyのarrayじゃなきゃだめじゃね?
    lower = np.array(lower) 
    upper = np.array(upper)

    # 指定色範囲以外をマスク
    img_mask = cv2.inRange(hsv, lower, upper)
    res = cv2.bitwise_and(img, img, mask=img_mask)
    cv2.imwrite("./_camera_frame/teemp.jpg", res)  # マスクした結果を保存

    # 認識対象の色のx座標, y座標, 面積を返す
    return self.find_specific_color(img, 0, lower, upper)

def find_specific_color(self, frame, AREA_RATIO_THRESHOLD, LOW_COLOR, HIGH_COLOR): # https://gist.github.com/TonyMooori/4cc29c94f7bdbade6ff6102fef45232e 
    """
    指定した範囲の色の物体の座標を取得する関数
    frame: 画像
    AREA_RATIO_THRESHOLD: area_ratio未満の塊は無視する
    LOW_COLOR: 抽出する色の下限(h,s,v)
    HIGH_COLOR: 抽出する色の上限(h,s,v)
    """
    h, w, c = frame.shape
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    ex_img = cv2.inRange(hsv, LOW_COLOR, HIGH_COLOR)

    # 面積を計算
    _, contours, hierarchy = cv2.findContours(
        ex_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    areas = np.array(list(map(cv2.contourArea, contours)))

    if len(areas) == 0 or np.max(areas) / (h * w) * 10000 < AREA_RATIO_THRESHOLD:
        # 見つからなかったらNoneを返す
        return None, None, None
    else:
        # 面積が最大の塊の重心を計算し返す
        max_idx = np.argmax(areas)
        max_area = areas[max_idx]
        result = cv2.moments(contours[max_idx])
        x = int(result["m10"]/result["m00"])
        y = int(result["m01"]/result["m00"])
        return [x, y, np.max(areas) / (h * w) * 10000]
4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1