キーボード操作の読取りと、動画フレームの読込みの2つを、ループ処理で繰り返す方法
TelloPyで、機体操作を行いながら、動画フレームを淀みなく取得するには、次の2つのループ処理を同時に行う必要があります。
- 【 ループ1 】 動画コンテナから、1フレームずつ、画像を取り出して受信するループ処理
- 【 ループ2 】 ユーザがPCのキーボードから打ち込む命令を、一定時間間隔に読み込むループ処理
( __ループ①と②の同時ループ__の例 )
ここでは、__DJITelloPy__付属のサンプルコードの事例を取り上げてみます。
- __github.com/damiafuentes/DJITelloPy__のサンプル用コード manual-control-opencv.py
tello.streamon() frame_read = tello.get_frame_read() #tello.takeoff() while True: # In reality you want to display frames in a seperate thread. Otherwise # they will freeze while the drone moves. img = frame_read.frame cv2.imshow("drone", img) key = cv2.waitKey(1) & 0xff if key == 27: # ESC break elif key == ord('w'): tello.move_forward(30) elif key == ord('s'): tello.move_back(30)
このサンプルコードでは、__key = cv2.waitKey(1) & 0xff__の部分で、ユーザからのキーボード入力を読み取っています。Macbookで動かしてみたところ、__key変数__を使っても、ユーザのキーボード入力を読み込むことができませんでした。
そこで、Python組込の__input()メソッド__を使うと、以下の事象にぶち当たりました。
【 ループ1 】 の1フレームを取り出して、そのフレームをデータ処理するコードの中で、ユーザのキー入力を取得しようとすると、ユーザがキー入力を指で打ち込んでいる間、次のフレームが読み込まれずに、ウィンドウの画像が固まってしまう。次々と新しいフレームを取りだして、__cv2.imshow()__していく繰り返し処理が、止まってしまうからです。
TerminalからEnterキーを連打すると、次々と次のフレーム画像が読み込まれて、Macbookのウィンドウに表示される画面は、Telloの移動にあわせて滑らかに動く、リアルタイムに近い映像になります。
また、ユーザが、0.5秒以内に、TerminalでEnterキーを打ち込まないと、__*While True:__で始まるループ処理の次の周に行く ( =次の画像フレームを取りに行く)ようなコードを書くのも、対策として有効です。
( 参考 )
( 対策コードの例 )
事前準備として、__pip3 install timeout-decorator__の実行が必要になります。
from timeout_decorator import timeout, TimeoutError TIMEOUT_SEC = 0.5 @timeout(TIMEOUT_SEC) def input_with_timeout(msg=None): return input(msg) if __name__ == '__main__': while True: try: input_str = input_with_timeout('\n{}秒以内に操作コマンドを入力して下さい : '.format(TIMEOUT_SEC)) print('\n操作コマンド: {} を受信しました。\n'.format(input_str)) except TimeoutError: print('\n操作コマンド入力時間切れ。次のフレーム画像を読み込みます。\n')
なお、__*input()*メソッド__を使って、キーボード入力を読取るコードに書き換える際には、注意すべきポイントがあります。注意点を、次の記事にまとました。
__この工夫を加えた__結果、次の記事では、__以下のすべてに成功__しました。
- Telloの移動中に、遅延なく画像を受信する。
- 受信した画像を、リアルタイムにMacbookのウィンドウに出力する。
- キーボード入力に反応して、意図した通りにTelloが動く。
- Telloが移動中の単眼カメラのフレーム画像が、滑らかにウィンドウに出力されている。
最後の項目は、Telloの移動が終わって、ホバリングしているときしか、フレーム画像が出力できない、という事象に直面しないことを意味しています。
【 方法2 】 動画取得の通信経路(スクリプトファイル)と、ドローン操作の通信経路(スクリプトファイル)を、切り分ける方法
その他、思いつく方法として、TelloとPCとの通信として、次のように独立した経路を2本立てられたらいい。
( 通信1 )
- Telloの単眼カメラの画像を、毎フレームずつ、常時継続してPC側に送ってくれる。
( 通信2 )
- PCからTelloに、操作コマンドを送出する。
( 通信3 )
- TelloからPCに、Telloの状態(バッテリ残量や高度、温度、など)を常時継続して、送信する。
ここで、各通信の動作は、別々のスクリプトファイルに記述したい。
そうすると、2つのループ処理をどうネストさせるか、という問題が、そもそも発生しない。
__ROS2__では、__異なるチャネルが、上記の「各通信」に該当する__だろうか。
【 方法3 】 ドローン操作をユーザから受け取る操作を行うループ処理の中から、画像取得を行う(事前定義済みの)関数メソッドを呼び出す方法
__第4のやり方__として、MuAuanさんが、次の方法をとっていました。
-1. 画像を1フレームずつ、コンテナから取り出してWindowに表示する処理を、事前に、関数(メソッド)として用意しておく。
-2. 一定時間ごとに、ユーザからドローン操作を受け取る処理を行うループ処理の中から、「1」の関数(メソッド)を呼び出す。
__@MuAuanさん__の記事のスクリプトファイルが、この方法を採用しています。
【 方法3ダッシュ 】 動画を取得するループ処理の中から、ドローン操作を行う(事前定義済みの)関数メソッドを呼び出す方法
なお、MuAuanさんが採用されているこの方法は、TelloPyを使って人物追跡を行うコードを実装した次の事例でも、採用されています。
( 取られている方法 )
- main.pyの中で、コンテナからカメラ画像を1つずつ取り出すループ処理の中で、drone.follow_personメソッドが呼び出されている。
for frame in container.decode(video=0)のループ処理の中に、以下のコードが記述されている。
img = detect.detection(image)
drone control
drone.follow_person(detect.bbox, detect.label, detect.score, 400*500)
- follow_personメソッド:このメソッドの中で、drone.forward()やdrone.backward()が呼び出されている。
( follow_personメソッドの定義場所 )
https://github.com/scepter914/TelloFollowingPerson/blob/master/drone_control.py
【 方法4 】 UDPの0.0.0.0:11111で、画像取得する方法
この方法は、試してみたところ、Macbookで、UDP受信に成功しなかったのですが、
次の参考コードの通り、スクリプトの最初と最後で、__streamonコマンドと、streamoffコマンドを送ると、後は、フレーム画像をループ処理で1枚1枚取りに行く__loopブロックを書かずに、Telloの単眼カメラ画像を自動で、次々と読み込んでくれます。
( 参考 )
sent = sock.sendto(b'streamon', tello_address)
( ドローン操作コードの記述 )
sent = sock.sendto(b'streamoff', tello_address)
方法3の実装コードを試してみた
まずは、__@MuAuanさん__スクリプトファイルをMacbookで動かしてみました。
( Macbookに入れてあるPyTelloのversion )
% pip3 list | grep tellopy
tellopy 0.7.0.dev0
%
実行結果
画像のウィンドウ表示もなく、Telloの離陸も行われずに、time outしてしまいました。
Traceback (most recent call last):
File "/Users/electron/Desktop/MuAuan/Tello/MuAuan_takeoffVideo.py", line 35, in main
drone.wait_for_connection(60.0)
File "/usr/local/lib/python3.9/site-packages/tellopy/_internal/tello.py", line 143, in wait_for_connection
raise error.TelloError('timeout')
tellopy._internal.error.TelloError: TelloError::timeout
TelloError::timeout
そこで、MuAuanさんのコードの中で__drone.takeoff()__のコードが記載された位置を変えてみました。
実行方法
% python3 MuAuan_takeoffVideo.py
もとのスクリプトファイル (MuAuanさんオリジナル)
import sys
import traceback
import tellopy
import av
import cv2.cv2 as cv2 # for avoidance of pylint error
import numpy
import time
from time import sleep
def videoCamera(start_time,container,drone,set_time):
frame_skip = 300
while True:
for frame in container.decode(video=0):
if 0 < frame_skip:
frame_skip = frame_skip - 1
continue
image = cv2.cvtColor(numpy.array(frame.to_image()), cv2.COLOR_RGB2BGR)
cv2.imshow('Original', image)
#cv2.imshow('Canny', cv2.Canny(image, 100, 200))
cv2.waitKey(1)
if frame.time_base < 1.0/60:
time_base = 1.0/60
else:
time_base = frame.time_base
frame_skip = int((time.time() - start_time)/time_base)
if time.time() - start_time > set_time:
return
else:
continue
def main():
drone = tellopy.Tello()
try:
drone.connect()
drone.wait_for_connection(60.0)
drone.takeoff()
sleep(10)
container = av.open(drone.get_video_stream())
start_time = time.time()
videoCamera(start_time,container,drone,10)
drone.down(50)
sleep(10)
start_time = time.time()
videoCamera(start_time,container,drone,10)
drone.land()
sleep(10)
videoCamera(start_time,container,drone,10)
drone.quit()
cv2.destroyAllWindows()
except Exception as ex:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback)
print(ex)
finally:
drone.quit()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
コードを微修正して、再度挑戦
実行方法
% python3 MuAuan_takeoffVideo_2.py
実行結果
得られた挙動は、次のようなものでした。
- 画像が入るまで、時間がかかった。
- 画像が入った後、少し待つと、Telloが離陸した。
- 離陸が終わり、空中に停止するホバリング動作に入ってから、数秒経つと、ホバリングしている位置での画像が入った。
- 左移動が終わり、再びホバリングした後に、ホバリングしている位置での画像が入った。
- 上昇中、下降中、前後左右移動中(以下、各アクションと表記)の画像が入らない。
- Telloの前に立って手を振ったところ、画像が体感感覚的に、数秒遅れて、PC側のWindowに表示される。リアルタイムの画像受信&表示ができない。
動画がリアルタイムに取得できない事象には、MuAuanさんも直面していたようで、今回参考にした記事に続く後続の記事の中で、MuAuanさんは、動画をリアルタイムに取得できるように、さらに工夫を重ねたいと語っています。
修正後のスクリプト・ファイル
import sys
import traceback
import tellopy
import av
import cv2.cv2 as cv2 # for avoidance of pylint error
import numpy
import time
from time import sleep
def videoCamera(start_time, container, drone, set_time):
frame_skip = 10
while True:
for frame in container.decode(video=0):
if 0 < frame_skip:
frame_skip = frame_skip - 1
continue
image = cv2.cvtColor(numpy.array(frame.to_image()), cv2.COLOR_RGB2BGR)
cv2.imshow('Original', image)
#cv2.imshow('Canny', cv2.Canny(image, 100, 200))
cv2.waitKey(1)
if frame.time_base < 1.0/60:
time_base = 1.0/60
else:
time_base = frame.time_base
frame_skip = int((time.time() - start_time)/time_base)
if time.time() - start_time > set_time:
return
else:
continue
def main():
drone = tellopy.Tello()
try:
drone.connect()
drone.wait_for_connection(60.0)
container = av.open(drone.get_video_stream())
start_time = time.time()
videoCamera(start_time, container, drone, 3.0)
start_time = time.time()
drone.takeoff()
sleep(2)
start_time = time.time()
videoCamera(start_time, container, drone, 3.0)
drone.left(30)
start_time = time.time()
videoCamera(start_time, container, drone, 3.0)
drone.right(50)
drone.up(15)
start_time = time.time()
videoCamera(start_time, container, drone, 3.0)
drone.down(30)
start_time = time.time()
videoCamera(start_time, container, drone, 3.0)
drone.land()
start_time = time.time()
videoCamera(start_time, container, drone, 3.0)
sleep(5)
drone.quit()
cv2.destroyAllWindows()
except Exception as ex:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback)
print(ex)
finally:
drone.quit()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
```Python3:
# simple example demonstrating how to control a Tello using your keyboard.
# For a more fully featured example see manual-control-pygame.py
#
# Use W, A, S, D for moving, E, Q for rotating and R, F for going up and down.
# When starting the script the Tello will takeoff, pressing ESC makes it land
# and the script exit.
from timeout_decorator import timeout, TimeoutError
from djitellopy import Tello
import cv2, math, time
TIMEOUT_SEC = 0.5
@timeout(TIMEOUT_SEC)
def input_with_timeout(msg=None):
return input(msg)
tello = Tello()
tello.connect()
tello.streamon()
frame_read = tello.get_frame_read()
# tello.takeoff()
while True:
# In reality you want to display frames in a seperate thread. Otherwise
# they will freeze while the drone moves.
img = frame_read.frame
cv2.imshow("drone", img)
#次の行(key = cv2.・・・)を削除すると、画像が受信できなくなる。
key = cv2.waitKey(1) & 0xff
try:
msg = input_with_timeout('\n{}秒以内に操作コマンドを入力して下さい :'.format(TIMEOUT_SEC))
print('\n操作コマンド: {} を受信しました。\n'.format(msg))
if msg == "i":
tello.takeoff()
elif msg == "w":
tello.move_forward(30)
elif msg == "s":
tello.move_back(30)
elif msg == "a":
tello.move_left(30)
elif msg == "d":
tello.move_right(30)
elif msg == "e":
tello.rotate_clockwise(30)
elif msg == "q":
tello.rotate_counter_clockwise(30)
elif msg == "r":
tello.move_up(30)
elif msg == "f":
tello.move_down(30)
elif msg == "g":
tello.land()
except TimeoutError:
print('\n操作コマンド入力時間切れ。次のフレーム画像を読み込みます。\n')
tello.land()