LoginSignup
14
11

More than 3 years have passed since last update.

「Tello_Video」のTelloクラスを使おう

Last updated at Posted at 2019-10-03

はじめに

このページは,

公式SDK「Tello-Python」を試そう

の1ページです.
全体を見たい場合は上記ページへお戻りください.

概要

DJI公式のTello用Pythonサンプルプログラム「Tello-Python」のうち,
Tello_Video を前回の記事で改変しました.

しかし,GUIウィンドウを作るライブラリ「Tkinter」を利用している部分は変更していません.

  • ウィンドウ無しで操縦したい
  • 画像ウィンドウはOpenCVのimshow関数で十分だよ

という人も多いのではないでしょうか.

今回は,それにチャレンジするための準備段階として,「Tello-Python」の中で最も重要なコア部分であるtello.pyのTelloクラスについて説明しようと思います.

前提条件

ホームフォルダにTello-Pythonがインストールされているという前提で話を進めます.

Linuxマシンであれば /home/(ユーザー名)/ に,Tello-Pythonというフォルダがあることになります.

詳しくは Tello-Pythonのダウンロード を御覧ください.

tello.pyの役割

「Tello_Video」のシステム構成を改めて見てみます.
tello_video_system.png

main.pyが実質たった8行のプログラムでした(前回の記事)から,TelloとのUDP通信は全てtello.pyが行っていることになります.

また,Telloから送られてきたH.264の映像データをOpenCVで使える画像データに変換してくれているのもtello.pyです.

つまり,tello.py(Telloクラス)さえ使いこなせれば,機体操縦も画像取得も全部できるのです.
複雑なtello_control_ui.pyなんか不要になります.

それでは,次項からtello.pyを利用した簡単なプログラムを作ってみましょう.

Telloからバッテリー残量を呼び出すプログラム

今回は,tello.pyのTelloクラスを利用して,バッテリーの残量を問い合わせるプログラムを作成します.

新しいディレクトリの作成

まずは,Tello-Pythonディレクトリの下に,新しいディレクトリTello-batteryを作ります.

Tello-batteryディレクトリを作成
$ cd ~/Tello-Python/
$ mkdir Tello-battery
$ cd Tello-battery

ソースプログラムの作成

このディレクトリに,以下の2つのファイルを置きます.

ダウンロードする場合は,以下のリンクを右クリックして[名前を付けて保存]などの機能を利用してください.

  main.py   tello.py

記事からコピー&ペーストする場合は,tello.py行数が多すぎるので,この記事の末尾に置かせてもらいました.
ここ へ移動して,main.pytello.pyそれぞれコピーしてテキストエディタ等で保存してください.

このmain.pytello.pyは,「Tello_Video」のそれをベースに改造し,日本語コメントを付けてあります.

libh264decoder.soをコピー

システム構成図からもわかるように,tello.py(Telloクラス)の動作には,必ずlibh264decoder.soが必要です.
「Tello_Video」を動かす際にビルドしたものがあるので,それをTello-batteryへコピーします.

H.264デコーダをコピー
$ cp ../Tello_Video/libh264decoder.so ./

以上で準備完了です.
Tello-batteryディレクトリには,3つのファイルがあるはずです.確認しましょう.

$ ls
libh264decoder.so  main.py  tello.py

この3つが最低限の動作プログラムになります.

プログラムの実行

「Tello_Video」の時と同様に,プログラム本体はmain.pyなので,これをコマンドラインから実行します.

プログラムの実行
$ python main.py

ctrl+cを押すことで,プログラムを終了できます.

実行結果

問題なく動作すれば,以下の様になるはずです.

実行結果
$ python main.py
sent: command
sent: streamon
>> send cmd: battery?
ok
[h264 @ 0x1f00e60] non-existing PPS 0 referenced
[h264 @ 0x1f00e60] non-existing PPS 0 referenced
[h264 @ 0x1f00e60] decode_slice_header error
[h264 @ 0x1f00e60] no frame!
[h264 @ 0x1f00e60] non-existing PPS 0 referenced
[h264 @ 0x1f00e60] non-existing PPS 0 referenced
[h264 @ 0x1f00e60] decode_slice_header error
[h264 @ 0x1f00e60] no frame!
>> send cmd: battery?
90

H.264デコーダの出力に混じって,バッテリー残量の問い合わせコマンドbattery?の送信と,応答結果の90(パーセント)が表示されていることがわかります.

main.pyの解説

ではmain.pyの中身を見てみます.

main.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import tello    # tello.pyをインポート
import time # time.sleepを使いたいので

# メイン関数本体
def main():
    # 初期化部
    # Telloクラスを使って,droneというインスタンス(実体)を作る
    drone = tello.Tello('', 8889) 

    # ループ部
    try:
        while True: #Ctrl+cが押されるまでループ
            print( drone.get_battery() )    # バッテリー残量を問い合わせてプリント
            time.sleep(0.3) # 0.3s待つ

    except( KeyboardInterrupt, SystemExit):    # Ctrl+cが押されたら離脱
        print( "SIGINTを検知" )

    # 終了処理部
    del drone   # telloクラスを削除


# "python main.py"として実行された時だけ動く様にするおまじない処理
if __name__ == "__main__":    # importされると"__main__"は入らないので,実行かimportかを判断できる.
    main()    # メイン関数を実行

shebangと文字コード指定

オリジナルのtello.pyと異なり,ソースファイルの冒頭には

shebangと文字コード
#!/usr/bin/env python
# -*- coding: utf-8 -*-

と書いてあります.

この2行は,Pythonプログラムでは「おまじない」と言われている部分で,必ず書いておいたほうが良い,とされています.

1行目の#!/usr/bin/env pythonは,スクリプトを実行するインタプリタを指定しており,「shebang」と言います.
2行目の# -*- coding: utf-8 -*-は,文字コードがutf-8であることを明示しています.
今回のプログラムは日本語でコメントが書いてあるので,2行目が無いとエラーになってしまうのです.

import部分

tello.pyのTelloクラスを使いたいので,import telloと書いてあります.
こう書くことで,main.pyと同じディレクトリにあるtello.pyを呼び出してくれます.

インポート
import tello   # tello.pyをインポート
import time    # time.sleepを使いたいので

メイン関数

メイン関数はdefで定義された関数なので,ここに書かれている段階では実行されません.プログラム末尾のmain()で実行されます.

メイン関数の中身は大きく分けて3つの部分に分かれています.
「初期化」「ループ」「終了処理」です.

メイン関数
# メイン関数本体
def main():
    初期化部

    ループ部

    終了処理部

それぞれ解説していきます.

初期化部

初期化部分は1行だけです.
tello.pyの中で定義されているTelloクラスを使って,droneという名前の実体を作っています.

初期化処理部
    # Telloクラスを使って,droneというインスタンス(実体)を作る
    drone = tello.Tello('', 8889) 

droneを作ったので,これ以降はdrone.メンバ名という書式でTelloクラスの中身(メンバ)を使うことができるようになりました.

ループ部

ループ部は実質3行です.
しかし,ctrl+cキーを押すことで終了させるためにtry except処理で挟んであります.

ループ部
    try:
        while True: #Ctrl+cが押されるまでループ
            print( drone.get_battery() )    # バッテリー残量を問い合わせてプリント
            time.sleep(0.3) # 0.3s待つ

    except( KeyboardInterrupt, SystemExit):    # Ctrl+cが押されたら離脱
        print( "SIGINTを検知" )

まずはwhile Trueで永久ループを作っています.ctrl+cの検知はtry exceptにお任せです.
drone.get_battery()で,Telloのバッテリー残量を得ることができます.
Telloクラスの内部では,UDP通信でTelloに"battery?"の文字列を送信し,応答データにバッテリー残量の文字列が入っているのです.
その残量をprintで画面出力しています.
whileループが速すぎるとCPUパワーが100%になってしまうので,適当にtime.sleepでウェイト(待ち時間)を入れています.

終了処理部

終了処理も簡単に1行です.

終了処理部
    del drone   # telloクラスを削除

初期化部で作ったdroneというインスタンスを,ちゃんと削除してメモリを開放しないといけませんから,delで殺しています.

おわりに

tello.pyのTelloクラスを使う,簡単なプログラム(main.py)を書いてみました.
Telloクラスを使うのって,実は簡単なんです.

Telloクラスには,他のget系のメンバとして

  • get_height()
  • get_flight_time()
  • get_speed()

があるので,get_battery()と同様の方法で現在状態をprintしてみると良いでしょう.

とは言え,情報を見るだけなんてつまらないですね.
次回は,キーボード入力でTelloを操作してみます.

今回使ったプログラム

main.py(Tello-batteryバージョン)

main.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import tello    # tello.pyをインポート
import time # time.sleepを使いたいので

# メイン関数本体
def main():

    # Telloクラスを使って,droneというインスタンス(実体)を作る
    drone = tello.Tello('', 8889) 

    #Ctrl+cが押されるまでループ
    try:
        while True:
            print( drone.get_battery() )    # バッテリー残量を問い合わせてプリント
            time.sleep(0.3) # 0.3s待つ

    except( KeyboardInterrupt, SystemExit):    # Ctrl+cが押されたら離脱
        print( "SIGINTを検知" )

    # telloクラスを削除
    del drone


# "python main.py"として実行された時だけ動く様にするおまじない処理
if __name__ == "__main__":      # importされると"__main__"は入らないので,実行かimportかを判断できる.
    main()    # メイン関数を実行

tello.py(日本語コメント付き)

クラス内部で,「コマンド応答」と「ビデオ受信」のスレッドを作り,永久ループ的に回ります.
main.pyの永久ループを含めれば,合計3本のスレッドが並行して走ることになるわけです.

基本的には,デフォルトのdrone = tello.Tello('', 8889)この使い方で十分です.
複数台のEDUを使う方法は,今後の記事で書こうと思います.

tello.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket           # UDP通信用
import threading        # マルチスレッド用
import time             # ウェイト時間用
import numpy as np      # 画像データの配列用
import libh264decoder   # H.264のデコード用(自分でビルドしたlibh264decoder.so)

class Tello:
    """Telloドローンと通信するラッパークラス"""

    def __init__(self, local_ip, local_port, imperial=False, command_timeout=.3, tello_ip='192.168.10.1', tello_port=8889):
        """
        クラスの初期化.ローカルのIP/ポートをバインドし,Telloをコマンドモードにする.

        :param local_ip (str): バインドする(UDPサーバにする)ローカルのIPアドレス
        :param local_port (int): バインドするローカルのポート番号
        :param imperial (bool): Trueの場合,速度の単位はマイル/時,距離の単位はフィート.
                                Falseの場合, 速度の単位はkm/h,距離はメートル.デフォルトはFalse
        :param command_timeout (int|float): コマンドの応答を待つ時間.デフォルトは0.3秒.
        :param tello_ip (str): TelloのIPアドレス.EDUでなければ192.168.10.1
        :param tello_port (int): Telloのポート.普通は8889
        """

        self.abort_flag = False     # 中断フラグ
        self.decoder = libh264decoder.H264Decoder() # H.264のデコード関数を登録
        self.command_timeout = command_timeout      # タイムアウトまでの時間
        self.imperial = imperial    # 速度と距離の単位を選択
        self.response = None    # Telloが応答したデータが入る
        self.frame = None       # BGR並びのnumpy配列 -- カメラの出力した現在の画像
        self.is_freeze = False  # カメラ出力を一時停止(フリーズ)するかどうかのフラグ
        self.last_frame = None  # 一時停止時に出力する画像
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)        # コマンド送受信のソケット
        self.socket_video = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # ビデオストリーム受信用のソケット
        self.tello_address = (tello_ip, tello_port)     # IPアドレスとポート番号のタプル(変更不可能)
        self.local_video_port = 11111                   # ビデオ受信のポート番号
        self.last_height = 0                            # get_heightで確認した最終の高度
        self.socket.bind((local_ip, local_port))        # コマンド受信のUDPサーバのスタート(バインド)

        # コマンドに対する応答の受信スレッド
        self.receive_thread = threading.Thread(target=self._receive_thread)     # スレッドの作成
        self.receive_thread.daemon = True   # メインプロセスの終了と一緒にスレッドが死ぬように設定

        self.receive_thread.start()         # スレッドスタート

        # ビデオ受信の開始 -- コマンド送信: command, streamon
        self.socket.sendto(b'command', self.tello_address)          # 'command'を送信し,TelloをSDKモードに
        print ('sent: command')
        self.socket.sendto(b'streamon', self.tello_address)         # 'streamon'を送信し,ビデオのストリーミングを開始
        print ('sent: streamon')

        self.socket_video.bind((local_ip, self.local_video_port))   # ビデオ受信のUDPサーバのスタート(バインド)

        # ビデオ受信のスレッド
        self.receive_video_thread = threading.Thread(target=self._receive_video_thread)     # スレッドの作成
        self.receive_video_thread.daemon = True # メインプロセスの終了と一緒にスレッドが死ぬように設定

        self.receive_video_thread.start()       # スレッドスタート

    def __del__(self):
        """ローカルのソケットを閉じる"""

        self.socket.close()         # コマンド送受信のソケットを閉じる
        self.socket_video.close()   # ビデオ受信のソケットを閉じる

    def read(self):
        """カメラで受信した最新の画像を返す"""
        if self.is_freeze:          # 一時停止フラグがTrueのときは,保存してある画像を返す
            return self.last_frame
        else:                       # そうでないときは,最新の画像を返す
            return self.frame

    def video_freeze(self, is_freeze=True):
        """ビデオ出力の一時停止 -- is_freezeフラグをTrueにセットすること"""
        self.is_freeze = is_freeze  # 一時停止フラグの状態をセット
        if is_freeze:               # Trueのときは,現在の画像をlast_frameに保存しておく
            self.last_frame = self.frame

    def _receive_thread(self):
        """
        Telloからの応答を監視する

        スレッドとして走らせる.Telloが最後に返した応答をself.responseに格納する

        """
        while True:
            try:
                self.response, ip = self.socket.recvfrom(3000)      # Telloからの応答を受信(最大3000バイトまで一度に受け取れる)
                #print(self.response)
            except socket.error as exc:     # エラー時の処理
                print ("Caught exception socket.error : %s" % exc)

    def _receive_video_thread(self):
        """
        Telloからのビデオストリーミング(H.264のrawデータ)を監視する

        スレッドとして走らせる.Telloから受信した最新の画像をself.frameに格納する

        """
        packet_data = ""    # 変数を初期化
        while True:
            try:
                res_string, ip = self.socket_video.recvfrom(2048)   # Telloからの画像データを受信(最大2048バイトまで一度に受け取れる)
                packet_data += res_string       # packet_dataに受信データを連結して1つの長いデータにする
                # フレームの最後
                if len(res_string) != 1460:     # 受信データのバイト数が1460以外のとき,packet_dataをデコードしframeを得る.
                    for frame in self._h264_decode(packet_data):    # デコードしたデータには何枚分かの画像が入っているので,枚数分繰り返す
                        self.frame = frame
                    packet_data = ""    # 変数を初期化

            except socket.error as exc:
                print ("Caught exception socket.error : %s" % exc)

    def _h264_decode(self, packet_data):
        """
        Telloから受信したH.264の生データをデコードする

        :param packet_data: H.264のrawデータ

        :return: デコードされた画像のリスト(複数枚の画像が入っていることもある)
        """
        res_frame_list = []     # リストの初期化
        frames = self.decoder.decode(packet_data)   # packet_dataをデコードする
        for framedata in frames:    # 何枚分かの画像が入っているので,枚数分繰り返す
            (frame, w, h, ls) = framedata   # データの分解
            if frame is not None:   # frameの中身が空でないとき
                # print 'frame size %i bytes, w %i, h %i, linesize %i' % (len(frame), w, h, ls)

                frame = np.fromstring(frame, dtype=np.ubyte, count=len(frame), sep='')      # 文字列データをnp.ubyte型の配列に作りなおす
                frame = (frame.reshape((h, ls / 3, 3)))     # RGBを考慮して3次元配列にする
                frame = frame[:, :w, :]                     # 画像の幅のぶんだけ取り出し,右側のゴミは捨てる
                res_frame_list.append(frame)                # リストの要素として追加

        return res_frame_list   # 複数枚の画像が入ったリストとして返す

    def send_command(self, command):
        """
        Telloへコマンドを送信し,応答を待つ

        :param command: 送信するコマンド
        :return (str): Telloの応答

        """

        print (">> send cmd: {}".format(command))
        self.abort_flag = False     # 中断フラグを倒す
        timer = threading.Timer(self.command_timeout, self.set_abort_flag)      # タイムアウト時間が立ったらフラグを立てるタイマースレッドを作成

        self.socket.sendto(command.encode('utf-8'), self.tello_address)     # コマンドを送信

        timer.start()   # スレッドスタート
        while self.response is None:        # タイムアウト前に応答が来たらwhile終了
            if self.abort_flag is True:     # タイムアウト時刻になったらブレイク
                break
        timer.cancel()  # スレッド中断

        if self.response is None:       # 応答データが無い時
            response = 'none_response'
        else:                           # 応答データがあるとき
            response = self.response.decode('utf-8')

        self.response = None    # _receive_threadスレッドが次の応答を入れてくれるので,ここでは空にしておく

        return response     # 今回の応答データを返す

    def set_abort_flag(self):
        """
        self.abort_flagのフラグをTrueにする

        send_command関数の中のタイマーで呼ばれる.

        この関数が呼ばれるということは,応答が来なくてタイムアウトした,ということ.

        """

        self.abort_flag = True

    def takeoff(self):
        """
        離陸開始

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """

        return self.send_command('takeoff')

    def set_speed(self, speed):
        """
        スピードを設定

        この関数の引数にはkm/hかマイル/hを使う.
        Tello APIは 1〜100 センチメートル/秒を使う

        Metric: .1 to 3.6 km/h
        Imperial: .1 to 2.2 Mile/h

        Args:
            speed (int|float): スピード

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """

        speed = float(speed)

        if self.imperial is True:       # 単位系に応じて計算
            speed = int(round(speed * 44.704))      # Mile/h -> cm/s
        else:
            speed = int(round(speed * 27.7778))     # km/h -> cm/s

        return self.send_command('speed %s' % speed)

    def rotate_cw(self, degrees):
        """
        時計回りの旋回

        Args:
            degrees (int): 旋回角度, 1〜360度

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """

        return self.send_command('cw %s' % degrees)

    def rotate_ccw(self, degrees):
        """
        反時計回りの旋回

        Args:
            degrees (int): 旋回角度, 1〜360度.

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """
        return self.send_command('ccw %s' % degrees)

    def flip(self, direction):
        """
        宙返り

        Args:
            direction (str): 宙返りする方向の文字, 'l', 'r', 'f', 'b'.

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.
        """

        return self.send_command('flip %s' % direction)

    def get_response(self):
        """
        Telloの応答を返す

        Returns:
            int: Telloの応答

        """
        response = self.response
        return response

    def get_height(self):
        """
        Telloの高度(dm)を返す

        Returns:
            int: Telloの高度(dm)

        """
        height = self.send_command('height?')
        height = str(height)
        height = filter(str.isdigit, height)
        try:
            height = int(height)
            self.last_height = height
        except:
            height = self.last_height
            pass
        return height

    def get_battery(self):
        """
        バッテリー残量をパーセンテージで返す

        Returns:
            int: バッテリー残量のパーセンテージ

        """

        battery = self.send_command('battery?')

        try:
            battery = int(battery)
        except:
            pass

        return battery

    def get_flight_time(self):
        """
        飛行時間を秒数で返す

        Returns:
            int: 飛行の経過時間

        """

        flight_time = self.send_command('time?')

        try:
            flight_time = int(flight_time)
        except:
            pass

        return flight_time

    def get_speed(self):
        """
        現在のスピードを返す

        Returns:
            int: 現在スピード, km/h または Mile/h

        """

        speed = self.send_command('speed?')

        try:
            speed = float(speed)

            if self.imperial is True:
                speed = round((speed / 44.704), 1)      # cm/s -> mile/h
            else:
                speed = round((speed / 27.7778), 1)     # cm/s -> km/h
        except:
            pass

        return speed

    def land(self):
        """
        着陸を開始

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """

        return self.send_command('land')

    def move(self, direction, distance):
        """
        direction の方向へ distance の距離だけ移動する.

        この引数にはメートルまたはフィートを使う.
        Tello API は 20〜500センチメートルを使う.

        Metric: .02 〜 5 メートル
        Imperial: .7 〜 16.4 フィート

        Args:
            direction (str): 移動する方向の文字列,'forward', 'back', 'right' or 'left'.
            distance (int|float): 移動する距離.(メートルまたはフィート)

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """

        distance = float(distance)

        if self.imperial is True:
            distance = int(round(distance * 30.48))     # feet -> cm
        else:
            distance = int(round(distance * 100))       # m -> cm

        return self.send_command('%s %s' % (direction, distance))

    def move_backward(self, distance):
        """
        distance の距離だけ後進する.

        Tello.move()のコメントを見ること.

        Args:
            distance (int): 移動する距離

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """

        return self.move('back', distance)

    def move_down(self, distance):
        """
        distance の距離だけ降下する.

        Tello.move()のコメントを見ること.

        Args:
            distance (int): 移動する距離

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """

        return self.move('down', distance)

    def move_forward(self, distance):
        """
        distance の距離だけ前進する.

        Tello.move()のコメントを見ること.

        Args:
            distance (int): 移動する距離

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """
        return self.move('forward', distance)

    def move_left(self, distance):
        """
        distance の距離だけ左移動する.

        Tello.move()のコメントを見ること.

        Args:
            distance (int): 移動する距離

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """
        return self.move('left', distance)

    def move_right(self, distance):
        """
        distance の距離だけ右移動する.

        Tello.move()のコメントを見ること.

        Args:
            distance (int): 移動する距離

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """
        return self.move('right', distance)

    def move_up(self, distance):
        """
        distance の距離だけ上昇する.

        Tello.move()のコメントを見ること.

        Args:
            distance (int): 移動する距離

        Returns:
            str: Telloからの応答.'OK'または'FALSE'.

        """

        return self.move('up', distance)
14
11
5

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
11