LoginSignup
67
65

More than 5 years have passed since last update.

トイドローン Tello をプログラミングで機能拡張!顔認識と自動追尾を実装してみた

Last updated at Posted at 2018-06-24

OpenCVでキャプチャしたTelloのカメラ映像を顔認識できるようにし、さらに認識した顔に対して一定の距離を保ち、画面中央に顔が来るように自動追尾する仕組みをPythonで実装してみました。

以下の記事の続きです。

トイドローン Tello をMacで操作&カメラ映像をOpenCVで受信してみる

前回の記事同様、予期しない動きをしたり制御不能になるリスクが伴いますので、周囲の環境に注意し、自己責任でお試しください。

動作確認環境

macOS 10.12.6
python 3.6.5
opencv 3.4.1

実行方法

キー入力検出のため、sudoで実行してください。

sudo python drone.py

以下のキーバインドを追加しています。

キー(追加) 動作
9 顔認識ON / 顔認識OFF
0 自動追尾ON / 自動追尾OFF

起動直後はどちらもOFFに設定されています。
顔認識がOFFのときは自動追尾をONにできません。
また顔認識ONかつ自動追尾ONのときに顔認識をOFFにすると、自動追尾もOFFになります。

主な仕組み

前回の記事の内容に加え、以下の機能を実装しています。

顔認識

OpenCVに予め用意されている分類器(Haar Cascade)を使って顔認識を実装しています。

drone.py
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

私の場合、以下のフォルダにインストールされていたものを実行プログラムと同じフォルダにコピーしました。

/usr/local/Cellar/opencv/3.4.1_5/share/OpenCV/haarcascades/haarcascade_frontalface_default.xml

顔認識の処理はやはり重いので、顔認識したあとはTracking APIを使って追尾するようにしました。Haar Cascadeによる顔認識時は2秒間隔で検出処理を行い、顔認識したあとは0.1秒間隔でトラッキング処理をさせています。

tello.py
# 顔認識またはトラッキング処理を実行するフラグをたてるためのタイマー
self.tracking_interval = 2
def _timer_detect(self):
    self.is_detect = True
    if not self.stop_drone:
        t = Timer(self.tracking_interval, self._timer_detect)
        t.start()
drone.py
# drone.is_detectがTrueのときに顔認識またはトラッキング処理を実行
if drone.is_detect:
    # Haar Cascadeによる顔認識
    if tracking == 0 or tracking == 2:
        rc = tracker_init(frame)
        if rc == 0:
            if tracking == 0:
                ret = tracker.init(frame, bbox)
            else:
                tracker_create(tracker_type)
                ret = tracker.init(frame, bbox)
            # 顔認識したら drone.tracking_intervalを0.1秒にしてトラッキングモードへ移行
            tracking = 1
            drone.tracking_interval = 0.1
    # トラッキング処理
    else:
        ret, bbox = tracker.update(frame)
        # Draw bounding box
        if ret:
            p1 = (int(bbox[0]), int(bbox[1]))
            p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3]))
            cv2.rectangle(frame, p1, p2, (255,0,0), 2, 1)
        else :
            print("Tracking failure detected")
            # トラッキングエラーが発生したらdrone.tracking_intervalを2秒にして顔認識に戻る
            tracking = 2
            drone.tracking_interval = 2
    # 顔認識またはトラッキング処理をしたらdrone.is_detectをFalseにする
    # drone.time_intervalの間隔でTrueに変わり、再度、顔認識またはトラッキング処理が実行される
    drone.is_detect = False

顔認識のON/OFFの状態は、カメラ映像左下のTracking:True/Falseで確認できます。また顔が検出できているときは、顔までの距離(cm)、画面中央から顔の中心までの距離(px)も表示されます。顔までの距離は、100cmのときの検出領域の平均的な面積(160px*160px ※私個人の場合)を基準として簡易に算出しています。

drone.py
d = round(L0 * m.sqrt(S0 / (w * h)))

自動追尾

自動追尾というほど高度なものではないですが、顔までの距離を一定に保ち、顔を画面中央に位置するように自動でコントロールする仕組みを入れてみました。

顔認識ONのときに数字の0を押すと、自動追尾ON/OFFの切り替えができます。自動追尾中は、離着陸以外、キーによる操縦ができません(自動追尾が不安定になったら0を押せばその場にホバリングします)。自動追尾ON/OFFの状態も、カメラ映像左下のAutoPilot:True/Falseで確認できます。

一定に保つ距離は以下で設定しています(単位はcmです)。顔認識から算出した距離が下記設定値から±15cmを超えると前進後退して調整します。

# Base Distance
LB = 120

左右については中心から±80px、上下については中心から±50pxを超えると、左右上下移動して調整します(回転はしません)。

if (d - LB) > 15:
    drone.pitch = drone.STICK_HOVER + drone.STICK_L
elif (d - LB) < -15:
    drone.pitch = drone.STICK_HOVER - drone.STICK_L
else:
    drone.pitch = drone.STICK_HOVER
if dx > 80:
    drone.roll = drone.STICK_HOVER + drone.STICK_L
elif dx < -80:
    drone.roll = drone.STICK_HOVER - drone.STICK_L
else:
    drone.roll = drone.STICK_HOVER
if dy > 50:
    drone.thr = drone.STICK_HOVER - drone.STICK_L
elif dy < -50:
    drone.thr = drone.STICK_HOVER + drone.STICK_L
else:
    drone.thr = drone.STICK_HOVER

追従速度を速くし過ぎると行ったり来たり不安定になり、遅くし過ぎると定位置に戻るのに時間がかかったりと、調整が難しいところです。Telloのジョイスティックコマンドは1024±660の範囲で制御されますが、いろいろ調整した結果、±60で追従させるようにしました。

以下、顔認識と追尾のイメージです。

image2.png

ゆっくりな動きにしか追従できず、さすがに上位機種のようにはいきませんが、アイデアとプログラミング次第で手軽に実験できるところがTelloのおもしろいところです。

ソースコード

drone.py
import tello
import numpy as np
import cv2
import sys
import time
import datetime
import math as m

LOCAL_IP = '192.168.10.2'
LOCAL_PORT_VIDEO = '8080'

# Center Cordinates
CX = 480
CY = 360

# Reference Distance
L0 = 100
S0 = 25600

# Base Distance
LB = 120

# Initialize Tracker
def tracker_init(frame):
    global bbox
    rc = 1
    w_cur = 0
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, 1.3, 5)
    for (x,y,w,h) in faces:
        if w >= w_cur:
            bbox = (x,y,w,h)
            w_cur = w
        bbox = (x,y,w,h)
    if w_cur > 0:
        rc = 0
    return rc

# Create Tracker
def tracker_create(tracker_type):
    global tracker
    if tracker_type == 'BOOSTING':
        tracker = cv2.TrackerBoosting_create()
    if tracker_type == 'MIL':
        tracker = cv2.TrackerMIL_create()
    if tracker_type == 'KCF':
        tracker = cv2.TrackerKCF_create()
    if tracker_type == 'TLD':
        tracker = cv2.TrackerTLD_create()
    if tracker_type == 'MEDIANFLOW':
        tracker = cv2.TrackerMedianFlow_create()
    if tracker_type == 'GOTURN':
        tracker = cv2.TrackerGOTURN_create()


if __name__ == '__main__':

    drone = tello.Tello()
    addr = 'udp://' + LOCAL_IP + ':' + str(LOCAL_PORT_VIDEO) + '?overrun_nonfatal=1&fifo_size=50000000'
    cap = cv2.VideoCapture(addr)

    # Set Cascade Classifier
    face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

    # Initialize Tracker
    tracker_types = ['BOOSTING', 'MIL','KCF', 'TLD', 'MEDIANFLOW', 'GOTURN']
    tracker_type = tracker_types[4]
    tracker_create(tracker_type)
    bbox = (480, 360, 160, 160)
    tracking = 0

    while(cap.isOpened()):
        ret, frame = cap.read()
        if ret == True:
            # Key '9' enable/disable tracking
            if drone.is_tracking:
                if drone.is_detect:

                    if tracking == 0 or tracking == 2:
                        rc = tracker_init(frame)
                        if rc == 0:
                            if tracking == 0:
                                ret = tracker.init(frame, bbox)
                            else:
                                tracker_create(tracker_type)
                                ret = tracker.init(frame, bbox)
                            tracking = 1
                            drone.tracking_interval = 0.1
                    else:
                        ret, bbox = tracker.update(frame)
                        # Draw bounding box
                        if ret:
                            p1 = (int(bbox[0]), int(bbox[1]))
                            p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3]))
                            cv2.rectangle(frame, p1, p2, (255,0,0), 2, 1)
                        else :
                            print("Tracking failure detected")
                            tracking = 2
                            drone.tracking_interval = 2

                    drone.is_detect = False

                try:
                    x = int(bbox[0])
                    y = int(bbox[1])
                    w = int(bbox[2])
                    h = int(bbox[3])
                    if w > 0:
                        d = round(L0 * m.sqrt(S0 / (w * h)))
                        dx = x + w/2 - CX
                        dy = y + h/2 - CY
                    else:
                        d = LB
                    cv2.putText(frame, ' D:' + str(d) + 'cm X:' + str(dx) + 'px Y:' + str(dy) + 'px', (360, 710), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 1, cv2.LINE_AA)
                    cv2.rectangle(frame,(x,y),(x+w,y+h),(255,0,0),2)
                    if drone.is_autopilot:
                        if (d - LB) > 15:
                            drone.pitch = drone.STICK_HOVER + drone.STICK_L
                        elif (d - LB) < -15:
                            drone.pitch = drone.STICK_HOVER - drone.STICK_L
                        else:
                            drone.pitch = drone.STICK_HOVER
                        if dx > 80:
                            drone.roll = drone.STICK_HOVER + drone.STICK_L
                        elif dx < -80:
                            drone.roll = drone.STICK_HOVER - drone.STICK_L
                        else:
                            drone.roll = drone.STICK_HOVER
                        if dy > 50:
                            drone.thr = drone.STICK_HOVER - drone.STICK_L
                        elif dy < -50:
                            drone.thr = drone.STICK_HOVER + drone.STICK_L
                        else:
                            drone.thr = drone.STICK_HOVER
                except Exception:
                    break
            else:
                tracking = 0
            cv2.putText(frame, 'Tracking:' + str(drone.is_tracking) + ' AutoPilot:' + str(drone.is_autopilot), (5, 710), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 1, cv2.LINE_AA)
            cv2.imshow("frame", frame)
            k = cv2.waitKey(1)
            if drone.stop_drone:
                print('stop: ' + str(drone.stop_drone))
                time.sleep(1)
                break
    cap.release()
    cv2.destroyAllWindows()
tello.py
import socket
from struct import Struct
import sys
import termios
import fcntl
import os
from getch import getch
from pynput.keyboard import Key, Listener
from threading import Thread, Timer
import time
import datetime


class Tello:

    TELLO_IP = '192.168.10.1'
    TELLO_PORT_CMD = 8889
    TELLO_PORT_VIDEO = 6038
    LOCAL_IP = '192.168.10.2'
    LOCAL_PORT_VIDEO = 8080

    # Tello Commands
    CMD_CONN_REQ = 'conn_req:'.encode() + TELLO_PORT_VIDEO.to_bytes(2,'little')
    CMD_REQ_IFRAME =(0xcc, 0x58, 0x00, 0x7c, 0x60, 0x25, 0x00, 0x00, 0x00, 0x6c, 0x95)
    CMD_TAKEOFF = (0xcc, 0x58, 0x00, 0x7c, 0x68, 0x54, 0x00, 0x01, 0x00, 0x6a, 0x90)
    CMD_LAND = (0xcc, 0x60, 0x00, 0x27, 0x68, 0x55, 0x00, 0x02, 0x00, 0x00, 0xc6, 0x5b)
    CMD_FLIGHT = (0xcc, 0xb0, 0x00, 0x7f, 0x60, 0x50, 0x00, 0x00, 0x00)
    STICK_HOVER = 1024
    STICK_H = 660
    STICK_M = 330
    STICK_L = 60

    # Format
    S11 = Struct("!11B")
    S12 = Struct("!12B")
    S22 = Struct("!22B")

    # CTC16 Table
    TBL_CRC16 = [
        0x0000, 0x1189, 0x2312, 0x329b, 0x4624, 0x57ad, 0x6536, 0x74bf, 0x8c48, 0x9dc1, 0xaf5a, 0xbed3, 0xca6c, 0xdbe5, 0xe97e, 0xf8f7,
        0x1081, 0x0108, 0x3393, 0x221a, 0x56a5, 0x472c, 0x75b7, 0x643e, 0x9cc9, 0x8d40, 0xbfdb, 0xae52, 0xdaed, 0xcb64, 0xf9ff, 0xe876,
        0x2102, 0x308b, 0x0210, 0x1399, 0x6726, 0x76af, 0x4434, 0x55bd, 0xad4a, 0xbcc3, 0x8e58, 0x9fd1, 0xeb6e, 0xfae7, 0xc87c, 0xd9f5,
        0x3183, 0x200a, 0x1291, 0x0318, 0x77a7, 0x662e, 0x54b5, 0x453c, 0xbdcb, 0xac42, 0x9ed9, 0x8f50, 0xfbef, 0xea66, 0xd8fd, 0xc974,
        0x4204, 0x538d, 0x6116, 0x709f, 0x0420, 0x15a9, 0x2732, 0x36bb, 0xce4c, 0xdfc5, 0xed5e, 0xfcd7, 0x8868, 0x99e1, 0xab7a, 0xbaf3,
        0x5285, 0x430c, 0x7197, 0x601e, 0x14a1, 0x0528, 0x37b3, 0x263a, 0xdecd, 0xcf44, 0xfddf, 0xec56, 0x98e9, 0x8960, 0xbbfb, 0xaa72,
        0x6306, 0x728f, 0x4014, 0x519d, 0x2522, 0x34ab, 0x0630, 0x17b9, 0xef4e, 0xfec7, 0xcc5c, 0xddd5, 0xa96a, 0xb8e3, 0x8a78, 0x9bf1,
        0x7387, 0x620e, 0x5095, 0x411c, 0x35a3, 0x242a, 0x16b1, 0x0738, 0xffcf, 0xee46, 0xdcdd, 0xcd54, 0xb9eb, 0xa862, 0x9af9, 0x8b70,
        0x8408, 0x9581, 0xa71a, 0xb693, 0xc22c, 0xd3a5, 0xe13e, 0xf0b7, 0x0840, 0x19c9, 0x2b52, 0x3adb, 0x4e64, 0x5fed, 0x6d76, 0x7cff,
        0x9489, 0x8500, 0xb79b, 0xa612, 0xd2ad, 0xc324, 0xf1bf, 0xe036, 0x18c1, 0x0948, 0x3bd3, 0x2a5a, 0x5ee5, 0x4f6c, 0x7df7, 0x6c7e,
        0xa50a, 0xb483, 0x8618, 0x9791, 0xe32e, 0xf2a7, 0xc03c, 0xd1b5, 0x2942, 0x38cb, 0x0a50, 0x1bd9, 0x6f66, 0x7eef, 0x4c74, 0x5dfd,
        0xb58b, 0xa402, 0x9699, 0x8710, 0xf3af, 0xe226, 0xd0bd, 0xc134, 0x39c3, 0x284a, 0x1ad1, 0x0b58, 0x7fe7, 0x6e6e, 0x5cf5, 0x4d7c,
        0xc60c, 0xd785, 0xe51e, 0xf497, 0x8028, 0x91a1, 0xa33a, 0xb2b3, 0x4a44, 0x5bcd, 0x6956, 0x78df, 0x0c60, 0x1de9, 0x2f72, 0x3efb,
        0xd68d, 0xc704, 0xf59f, 0xe416, 0x90a9, 0x8120, 0xb3bb, 0xa232, 0x5ac5, 0x4b4c, 0x79d7, 0x685e, 0x1ce1, 0x0d68, 0x3ff3, 0x2e7a,
        0xe70e, 0xf687, 0xc41c, 0xd595, 0xa12a, 0xb0a3, 0x8238, 0x93b1, 0x6b46, 0x7acf, 0x4854, 0x59dd, 0x2d62, 0x3ceb, 0x0e70, 0x1ff9,
        0xf78f, 0xe606, 0xd49d, 0xc514, 0xb1ab, 0xa022, 0x92b9, 0x8330, 0x7bc7, 0x6a4e, 0x58d5, 0x495c, 0x3de3, 0x2c6a, 0x1ef1, 0x0f78
    ]

    def __init__(self):

        self.stop_drone = False

        self.addr_cmd_tx = (self.TELLO_IP, self.TELLO_PORT_CMD)
        self.sock_cmd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

        # Start Receiving Command
        #self.thread_cmd_rx = Thread(target=self._cmd_rx)
        #self.thread_cmd_rx.start()

        # Send Connection Request
        self._cmd_tx(self.CMD_CONN_REQ)

        # Initial Terminal Settings
        self.fd = sys.stdin.fileno()
        self.attr_org = termios.tcgetattr(self.fd)
        self.fcntl_org = fcntl.fcntl(self.fd, fcntl.F_GETFL)

        # Cancel Echo
        self._echo_off()

        # Initialize Tracking Interval
        self.tracking_interval = 2

        # Start Key Listener
        self.thread_key_listener = Thread(target=self._key_listener)
        self.thread_key_listener.start()

        # Initialize Flight Status
        self.in_flight = False

        # Initialize Flight Command
        self.mode = 0
        self.yaw = self.STICK_HOVER
        self.thr = self.STICK_HOVER
        self.pitch = self.STICK_HOVER
        self.roll = self.STICK_HOVER

        # Start Sending Flight Command
        self.thread_flight_ctrl = Thread(target=self._flight_ctrl)
        self.thread_flight_ctrl.start()

        # Start Requesting I-Frame
        self.thread_req_iframe = Thread(target=self._req_iframe)
        self.thread_req_iframe.start()

        # Start Forwarding Video
        self.thread_fwd_video = Thread(target=self._fwd_video)
        self.thread_fwd_video.start()

        # Initialize Tracking Flag
        self.is_tracking = False
        self.is_detect = False
        self.is_autopilot = False

        # Start Tracking Timer
        self.thread_timer_detect = Thread(target=self._timer_detect)
        self.thread_timer_detect.start()

    def _echo_off(self):
        attr = termios.tcgetattr(self.fd)
        attr[3] = attr[3] & ~termios.ECHO & ~termios.ICANON # & ~termios.ISIG
        termios.tcsetattr(self.fd, termios.TCSADRAIN, attr)
        fcntl.fcntl(self.fd, fcntl.F_SETFL, self.fcntl_org | os.O_NONBLOCK)

    def _echo_on(self):
        fcntl.fcntl(self.fd, fcntl.F_SETFL, self.fcntl_org)
        termios.tcsetattr(self.fd, termios.TCSANOW, self.attr_org)

    def _key_listener(self):
        with Listener(
            on_press = self._on_press,
            on_release = self._on_release
        ) as listener:
            listener.join()

    def _on_press(self, key):
        try:
            keyPressed = '{0}'.format(key.char)
            if not self.is_tracking and keyPressed == '9':
                self.is_tracking = True
            elif self.is_tracking and keyPressed == '9':
                self.is_tracking = False
                self.is_autopilot = False
            elif self.is_tracking and not self.is_autopilot and keyPressed == '0':
                self.is_autopilot = True
            elif self.is_tracking and self.is_autopilot and keyPressed == '0':
                self.is_autopilot = False
            elif not self.is_autopilot:
                if keyPressed == 'W':
                    self.thr = self.STICK_HOVER + self.STICK_H
                elif keyPressed == 'w':
                    self.thr = self.STICK_HOVER + self.STICK_M
                elif keyPressed == 'S':
                    self.thr = self.STICK_HOVER - self.STICK_H
                elif keyPressed == 's':
                    self.thr = self.STICK_HOVER - self.STICK_M
                elif keyPressed == 'A':
                    self.yaw = self.STICK_HOVER - self.STICK_H
                elif keyPressed == 'a':
                    self.yaw = self.STICK_HOVER - self.STICK_M
                elif keyPressed == 'D':
                    self.yaw = self.STICK_HOVER + self.STICK_H
                elif keyPressed == 'd':
                    self.yaw = self.STICK_HOVER + self.STICK_M
                elif keyPressed == 'I':
                    self.pitch = self.STICK_HOVER + self.STICK_H
                elif keyPressed == 'i':
                    self.pitch = self.STICK_HOVER + self.STICK_M
                elif keyPressed == 'K':
                    self.pitch = self.STICK_HOVER - self.STICK_H
                elif keyPressed == 'k':
                    self.pitch = self.STICK_HOVER - self.STICK_M
                elif keyPressed == 'J':
                    self.roll = self.STICK_HOVER - self.STICK_H
                elif keyPressed == 'j':
                    self.roll = self.STICK_HOVER - self.STICK_M
                elif keyPressed == 'L':
                    self.roll = self.STICK_HOVER + self.STICK_H
                elif keyPressed == 'l':
                    self.roll = self.STICK_HOVER + self.STICK_M
                else:
                    self.thr = self.STICK_HOVER
                    self.yaw = self.STICK_HOVER
                    self.pitch = self.STICK_HOVER
                    self.roll = self.STICK_HOVER
        except AttributeError:
            keyPressed = '{0}'.format(key)
            if not self.in_flight and keyPressed == 'Key.space':
                cmd = list(self.CMD_TAKEOFF)
                self._cmd_tx(cmd)
                self.in_flight = True
            elif self.in_flight and keyPressed == 'Key.space':
                cmd = list(self.CMD_LAND)
                self._cmd_tx(cmd)
                self.in_flight = False
            elif not self.in_flight and keyPressed == 'Key.enter':
                self.stop_drone = True
                self._echo_on()
                while True:
                    clearBuffer = getch()
                    if clearBuffer == '\n':
                        break
                return False
            elif not self.is_autopilot:
                if keyPressed == 'Key.up':
                    self.pitch = self.STICK_HOVER + self.STICK_M
                elif keyPressed == 'Key.down':
                    self.pitch = self.STICK_HOVER - self.STICK_M
                elif keyPressed == 'Key.left':
                    self.roll = self.STICK_HOVER - self.STICK_M
                elif keyPressed == 'Key.right':
                    self.roll = self.STICK_HOVER + self.STICK_M
                else:
                    self.thr = self.STICK_HOVER
                    self.yaw = self.STICK_HOVER
                    self.pitch = self.STICK_HOVER
                    self.roll = self.STICK_HOVER

    def _on_release(self, key):
        if not self.is_autopilot:
            self.thr = self.STICK_HOVER
            self.yaw = self.STICK_HOVER
            self.pitch = self.STICK_HOVER
            self.roll = self.STICK_HOVER
        else:
            return

    def _cmd_rx(self):
        while True:
            try:
                data, server = self.sock_cmd.recvfrom(1518)
                print('Rx: ' + str(data))
            except Exception as e:
                print(e)
                break

    def _cmd_tx(self, cmd):
        if type(cmd) == bytes:
            cmd = cmd
            self.sock_cmd.sendto(cmd, self.addr_cmd_tx)
        elif type(cmd) == list:
            if len(cmd) == 11:
                s = self.S11
            elif len(cmd) == 12:
                s = self.S12
            elif len(cmd) == 22:
                s = self.S22
            else:
                return
                #print('Tx: unknown format')
            if s:
                cmd = s.pack(*cmd)
                self.sock_cmd.sendto(cmd, self.addr_cmd_tx)
                #print('Tx: ' + str(cmd))

    def _flight_ctrl(self):
        c = (self.mode<<44) + (self.yaw<<33) + (self.thr<<22) + (self.pitch<<11) + (self.roll)
        cmd = list(self.CMD_FLIGHT)
        for i in range(0,6):
            cmd.append(c>>(8*i)&0xff)
        now = datetime.datetime.now()
        h = now.hour
        m = now.minute
        s = now.second
        ms = round(now.microsecond / 1000)
        cmd.append(h)
        cmd.append(m)
        cmd.append(s)
        cmd.append(ms & 0xff)
        cmd.append(ms >> 8)
        buf = bytearray()
        for b in cmd:
            buf.append(b)
        crc16 = self._calc_crc16(buf, len(buf))
        cmd.append(crc16 & 0xff)
        cmd.append(crc16 >> 8)
        self._cmd_tx(cmd)
        if not self.stop_drone:
            t = Timer(0.02, self._flight_ctrl)
            t.start()

    def _req_iframe(self):
        cmd = list(self.CMD_REQ_IFRAME)
        self._cmd_tx(cmd)
        if not self.stop_drone:
            t = Timer(1, self._req_iframe)
            t.start()
        else:
            self.sock_cmd.close()

    def _fwd_video(self):
        self.sock_video = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.addr_video = (self.LOCAL_IP, self.TELLO_PORT_VIDEO)
        self.sock_video.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock_video.settimeout(.5)
        self.sock_video.bind(self.addr_video)
        self.sock_fwd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.addr_fwd = (self.LOCAL_IP, self.LOCAL_PORT_VIDEO)
        self.sock_fwd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock_fwd.settimeout(.5)
        data = bytearray(4096)
        slice = bytearray()
        isSps = False

        while not self.stop_drone:
            try:
                size, addr = self.sock_video.recvfrom_into(data)
            except socket.timeout:
                time.sleep(.5)
                continue
            except socket.error as e:
                print(e)
                break
            else:
                if size > 6 and data[2] == 0x00 and data[3] == 0x00 and data[4] == 0x00 and data[5] == 0x01:
                    nal_type = data[6] & 0x1f
                    if nal_type == 7:
                        isSps = True
                if isSps:
                    self.sock_fwd.sendto(data[2:size], self.addr_fwd)
        self.sock_video.close()
        self.sock_fwd.close()

    def _timer_detect(self):
        self.is_detect = True
        if not self.stop_drone:
            t = Timer(self.tracking_interval, self._timer_detect)
            t.start()

    def _calc_crc16(self, buf, size):
        i = 0
        seed = 0x3692
        while size > 0:
            seed = self.TBL_CRC16[(seed ^ buf[i]) & 0xff] ^ (seed >> 8)
            i = i + 1
            size = size - 1

        return seed

実行方法

キー入力検知のためsudoで実行します。

sudo python drone.py

参考

以下のサイトを参考にさせていただきました。感謝。

67
65
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
67
65