TelloというドローンをPCから操作したり物体認識する方法を記載しています。
[参考にさせて頂いた記事とドキュメント]
記事➀:
PythonによるTello操作(基本、及びクラウドからのMQTTによる操作まで)
https://qiita.com/makotaka/items/e3ed92e58e9bc15cefbf
記事➁:
TELLOをpythonで動かしてみる
https://qiita.com/takanorimutoh/items/759734f17321344615b6
記事➂:
Telloドローンでプログラミング!ーディープラーニングで物体認識編ー
https://mitomoyo.com/2018-06-22-010142/
記事➃:
Pythonによるドローン「Tello」の制御
※UIによるTelloの制御
https://midoriit.com/2018/05/python%E3%81%AB%E3%82%88%E3%82%8B%E3%83%89%E3%83%AD%E3%83%BC%E3%83%B3%E3%80%8Ctello%E3%80%8D%E3%81%AE%E5%88%B6%E5%BE%A1.html
記事⑤:
【Tello】トイ・ドローンで遊んでみた♪~キーボードのキーを使ってTelloを制御
https://qiita.com/MuAuan/items/ccd0e7881fed8759904d
Tello SDK(pdfドキュメント)
https://terra-1-g.djicdn.com/2d4dce68897a46b19fc717f3576b7c6a/Tello%20%E7%BC%96%E7%A8%8B%E7%9B%B8%E5%85%B3/For%20Tello/Tello%20SDK%20Documentation%20EN_1.3_1122.pdf
#PC環境
windows10
Python 3.6 (Anaconda Navigator使用)
#TelloとPCとの接続
ドキュメント「Tello SDK」に記載されている通り、Tello IP: 192.168.10.1 に UDP PORT:8889 経由で接続することでPCからTelloを操作できます。
これを実現するには、PCからwifiでTELLO-XXXXXXといったアクセスポイントに接続して、下記のコマンドをPCで実行します。
※記事➀から引用
#!/usr/bin/env python
import socket
import time
#udpソケット作成
socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
tello_address = ('192.168.10.1', 8889)
#コマンドモードを使うため'command'というテキストを投げる
socket.sendto('command'.encode('utf-8'),tello_address)
この後、例えばtakeoffする場合は
time.sleep(5)
socket.sendto('takeoff'.encode('utf-8'),tello_address)
といったコードを実行します。
socket通信についてはこちらの記事が参考になりました↓
https://qiita.com/__init__/items/5c89fa5b37b8c5ed32a4
#キーボードからのコマンド入力でTelloを操作する
キーボードからのコマンド入力でTelloを操作する方法は記事➁に記載されており、Tello3.pyというファイルを実行した後は、
takeoff
forward 50
cw 360
land
のようにコマンドプロンプトにコマンドを入力して操作できるようになります。
こちらの環境ではTello3.pyにおいて
host = ''
port = 9000
locaddr = (host,port)
を削除し、
sock.bind(locaddr)
を
sock.sendto('command'.encode('utf-8'),tello_address)
time.sleep(1)
に変更すると動作しました。
Tello3.pyについてメモ:
- 入力したコマンドはinput関数によってmsgという変数に入れられ、sock.sendto(msg, tello_address)によってTelloに送信されます。
#Telloで物体認識を行う
TelloでSSDによる物体認識を行う方法は記事➂に記載されていますが、コード中のインデントがなくなっていてそのままでは実行できませんでした。
↓このサイトにインデントありのコードがあるので、こちらをコピーしました。
https://teratail.com/questions/218414
コピーした内容は、こちらのPC環境では
C:\Users\user\.conda\envs\droneenv\Lib\site-packages\tellopy\examples\video_effect.py
に上書きしました。
さらにSSDを使用するため
https://mmm-ssss.com/2018/11/26/ssd_keras_1/
に記載の通り、
git clone https://github.com/rykov8/ssd_keras.git
によってssd_kerasフォルダをコピーし、
このREADME.mdに書かれているリンク
https://mega.nz/folder/7RowVLCL#q3cEVRK9jyOSB9el3SssIA
から学習済みモデルweights_SSD300.hdf5をダウンロードしてssd_kerasフォルダに保存しました。
記事➂が書かれたころと比べてtensorflowやKerasなどのバージョンが新しくなっており、
記事➂の通りではエラーになりました。
試行錯誤した結果、各ツールを下記のバージョンにしました。
Keras 1.2.2
scipy 1.2.0
tensorflow 1.13.1
PCとTelloを接続した状態でコマンドプロンプトから
python -m tellopy.examples.video_effect
を実行すると物体認識が動作しました。
写真はパソコン画面をTV monitorとして認識している様子です。
使用したコードは以下です。
(元コードではssd_v2になっていますが今のPC環境では使えなかったのでssdに変更しています)
import cv2
import keras
from keras.applications.imagenet_utils import preprocess_input
from keras.backend.tensorflow_backend import set_session
from keras.models import Model
from keras.preprocessing import image
import matplotlib.pyplot as plt
import numpy as np
from scipy.misc import imread
import tensorflow as tf
#from ssd_v2 import SSD300v2
from ssd import SSD300
from ssd_utils import BBoxUtility
from PIL import Image
import sys
import traceback
import tellopy
import av
import numpy
plt.rcParams['figure.figsize'] = (8, 8)
plt.rcParams['image.interpolation'] = 'nearest'
np.set_printoptions(suppress=True)
config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.45
set_session(tf.Session(config=config))
voc_classes = ['Aeroplane', 'Bicycle', 'Bird', 'Boat', 'Bottle',
'Bus', 'Car', 'Cat', 'Chair', 'Cow', 'Diningtable',
'Dog', 'Horse','Motorbike', 'Person', 'Pottedplant',
'Sheep', 'Sofa', 'Train', 'Tvmonitor']
NUM_CLASSES = len(voc_classes) + 1
input_shape=(100, 100, 3)
#model = SSD300v2(input_shape, num_classes=NUM_CLASSES)
model = SSD300(input_shape, num_classes=NUM_CLASSES)
model.load_weights('weights_SSD300.hdf5', by_name=True)
bbox_util = BBoxUtility(NUM_CLASSES)
global label_name
def getSSDImage(frame):
img2 = image.img_to_array(frame.resize((100, 100)))
img = np.asarray(frame)
inputs = []
inputs.append(img2.copy())
inputs = preprocess_input(np.array(inputs))
preds = model.predict(inputs, batch_size=1, verbose=1)
results = bbox_util.detection_out(preds)
# Parse the outputs.
det_label = results[0][:, 0]
det_conf = results[0][:, 1]
det_xmin = results[0][:, 2]
det_ymin = results[0][:, 3]
det_xmax = results[0][:, 4]
det_ymax = results[0][:, 5]
# Get detections with confidence higher than 0.6.
top_indices = [i for i, conf in enumerate(det_conf) if conf >= 0.6]
top_conf = det_conf[top_indices]
top_label_indices = det_label[top_indices].tolist()
top_xmin = det_xmin[top_indices]
top_ymin = det_ymin[top_indices]
top_xmax = det_xmax[top_indices]
top_ymax = det_ymax[top_indices]
colors = plt.cm.hsv(np.linspace(0, 1, 21)).tolist()
for i in range(top_conf.shape[0]):
xmin = int(round(top_xmin[i] * img.shape[1]))
ymin = int(round(top_ymin[i] * img.shape[0]))
xmax = int(round(top_xmax[i] * img.shape[1]))
ymax = int(round(top_ymax[i] * img.shape[0]))
score = top_conf[i]
label = int(top_label_indices[i])
label_name = voc_classes[label - 1]
display_txt = '{:0.2f}, {}'.format(score, label_name)
color = colors[label]
cv2.rectangle(img, (xmin, ymin), (xmax, ymax), (int(colors[label][0]*255), int(colors[label][1]*255), int(colors[label][2]*255)), 2)
cv2.rectangle(img, (xmin, ymin-15), (xmin+100, ymin+5), (int(colors[label][0]*255), int(colors[label][1]*255), int(colors[label][2]*255)),-1)
cv2.putText(img, display_txt, (xmin, ymin), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 1, cv2.LINE_AA)
imgcv = img[:, :, ::-1].copy()
return imgcv
def main():
drone = tellopy.Tello()
try:
drone.connect()
drone.wait_for_connection(60.0)
drone.set_loglevel(drone.LOG_INFO)
drone.set_exposure(0)
#container = av.open(drone.get_video_stream())
retry=3
container=None
while container is None and 0 < retry:
retry -= 1
try:
container = av.open(drone.get_video_stream())
except av.AVError as ave:
print(ave)
print('retry...')
frame_skip=300
frame_count = 0
while True:
for frame in container.decode(video=0):
frame_count = frame_count + 1
if (frame_count > 300) and (frame_count%50 == 0):
imgpil = frame.to_image()
image = getSSDImage(imgpil)
cv2.imshow('Original', image)
cv2.waitKey(1)
except Exception as ex:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback)
print(ex)
finally:
drone.quit()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
#tellopyについて
上記の物体認識ではtellopyをインポートしています。
tellopyのTelloクラスは
https://github.com/hanyazou/TelloPy/blob/develop-0.7.0/tellopy/_internal/tello.py
に定義されています。
tellopyではsend_packetという関数に
self.sock.sendto(cmd, self.tello_addr)
と書かれており、上記と同じくsocket通信でtelloを操作しています。
ただし、どうやらコマンドモードは使用しておらず、
例えばtakeoffの関数は
def takeoff(self):
"""Takeoff tells the drones to liftoff and start flying."""
log.info('set altitude limit 30m')
pkt = Packet(SET_ALT_LIMIT_CMD)
pkt.add_byte(0x1e) # 30m
pkt.add_byte(0x00)
self.send_packet(pkt)
log.info('takeoff (cmd=0x%02x seq=0x%04x)' % (TAKEOFF_CMD, self.pkt_seq_num))
pkt = Packet(TAKEOFF_CMD)
pkt.fixup()
return self.send_packet(pkt)
となっています。
TAKEOFF_CMDのような変数はMessage IDというものと対応しており
https://github.com/hanyazou/TelloPy/blob/develop-0.7.0/tellopy/_internal/protocol.py
に定義されています。
例:
TAKEOFF_CMD = 0x0054
#UIによるTelloの制御
記事④のコードを使用すると
というウィンドウが出てきてTakeoffとLandができます。
ただしPyQt5というモジュールが使われているため
pip install pyqt5
を実行する必要がありました。
#キーボードのキーでTelloを制御しながらPCでカメラ画像を見る
記事⑤のコードを使用すると、キーボードのキーでTelloを制御しながらPCでカメラ画像を見ることができます。
ただし、コードを実行するとカメラ画像が表示されるウィンドウが出て、qのキーを押してそれを消すとTelloがtakeoffし、また画像のウィンドウが出ました。
キーを押したときのTelloの反応や画像は何秒か遅れがありました。
#キーボードのキーでTelloを制御しながら物体認識する
上記の「Telloで物体認識を行う」のコードと記事⑤のコードを基に、キーボードのキーでTelloを制御しながら物体認識するコードを作成することができました。
ただし、キーで少し操作すると画像がフリーズしました。
物体認識したりキーによる操作をしたり、処理が重すぎるのかもしれないです。
(改善は試みますが、Telloの限界なのかも、、)
使用したコードはこちらです。
import cv2
import keras
from keras.applications.imagenet_utils import preprocess_input
from keras.backend.tensorflow_backend import set_session
from keras.models import Model
from keras.preprocessing import image
import matplotlib.pyplot as plt
import numpy as np
from scipy.misc import imread
import tensorflow as tf
#from ssd_v2 import SSD300v2
from ssd import SSD300
from ssd_utils import BBoxUtility
from PIL import Image
import sys
import traceback
import tellopy
import av
import numpy
plt.rcParams['figure.figsize'] = (8, 8)
plt.rcParams['image.interpolation'] = 'nearest'
np.set_printoptions(suppress=True)
config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.45
set_session(tf.Session(config=config))
voc_classes = ['Aeroplane', 'Bicycle', 'Bird', 'Boat', 'Bottle',
'Bus', 'Car', 'Cat', 'Chair', 'Cow', 'Diningtable',
'Dog', 'Horse','Motorbike', 'Person', 'Pottedplant',
'Sheep', 'Sofa', 'Train', 'Tvmonitor']
NUM_CLASSES = len(voc_classes) + 1
input_shape=(100, 100, 3)
#model = SSD300v2(input_shape, num_classes=NUM_CLASSES)
model = SSD300(input_shape, num_classes=NUM_CLASSES)
model.load_weights('weights_SSD300.hdf5', by_name=True)
bbox_util = BBoxUtility(NUM_CLASSES)
global label_name
def getSSDImage(frame):
img2 = image.img_to_array(frame.resize((100, 100)))
img = np.asarray(frame)
inputs = []
inputs.append(img2.copy())
inputs = preprocess_input(np.array(inputs))
preds = model.predict(inputs, batch_size=1, verbose=1)
results = bbox_util.detection_out(preds)
# Parse the outputs.
det_label = results[0][:, 0]
det_conf = results[0][:, 1]
det_xmin = results[0][:, 2]
det_ymin = results[0][:, 3]
det_xmax = results[0][:, 4]
det_ymax = results[0][:, 5]
# Get detections with confidence higher than 0.6.
top_indices = [i for i, conf in enumerate(det_conf) if conf >= 0.6]
top_conf = det_conf[top_indices]
top_label_indices = det_label[top_indices].tolist()
top_xmin = det_xmin[top_indices]
top_ymin = det_ymin[top_indices]
top_xmax = det_xmax[top_indices]
top_ymax = det_ymax[top_indices]
colors = plt.cm.hsv(np.linspace(0, 1, 21)).tolist()
for i in range(top_conf.shape[0]):
xmin = int(round(top_xmin[i] * img.shape[1]))
ymin = int(round(top_ymin[i] * img.shape[0]))
xmax = int(round(top_xmax[i] * img.shape[1]))
ymax = int(round(top_ymax[i] * img.shape[0]))
score = top_conf[i]
label = int(top_label_indices[i])
label_name = voc_classes[label - 1]
display_txt = '{:0.2f}, {}'.format(score, label_name)
color = colors[label]
cv2.rectangle(img, (xmin, ymin), (xmax, ymax), (int(colors[label][0]*255), int(colors[label][1]*255), int(colors[label][2]*255)), 2)
cv2.rectangle(img, (xmin, ymin-15), (xmin+100, ymin+5), (int(colors[label][0]*255), int(colors[label][1]*255), int(colors[label][2]*255)),-1)
cv2.putText(img, display_txt, (xmin, ymin), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 1, cv2.LINE_AA)
imgcv = img[:, :, ::-1].copy()
return imgcv
def main():
drone = tellopy.Tello()
try:
drone.connect()
drone.wait_for_connection(60.0)
drone.set_loglevel(drone.LOG_INFO)
drone.set_exposure(0)
#container = av.open(drone.get_video_stream())
retry=3
container=None
while container is None and 0 < retry:
retry -= 1
try:
container = av.open(drone.get_video_stream())
except av.AVError as ave:
print(ave)
print('retry...')
frame_skip=300
frame_count = 0
drone.takeoff()
while True:
for frame in container.decode(video=0):
frame_count = frame_count + 1
if (frame_count > 300) and (frame_count%50 == 0):
imgpil = frame.to_image()
image = getSSDImage(imgpil)
cv2.imshow("org",image)
key = cv2.waitKey(1)&0xff
print("key=",key,ord('q'))
if key == ord('n'): #n
drone.down(10)
sleep(5)
elif key==ord('u'): #117: #u
drone.up(10)
sleep(5)
elif key==ord('h'): #104: #h
drone.left(3)
sleep(1)
elif key==ord('j'): #106: #j
drone.right(3)
sleep(1)
elif key==ord('b'): #backward
drone.backward(3)
sleep(1)
elif key==ord('f'): #forward
drone.forward(3)
sleep(1)
elif key==ord('c'): #clockwise
drone.clockwise(10)
sleep(1)
elif key==ord('q'): #quit
cv2.destroyAllWindows()
break
else:
continue
break
except Exception as ex:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback)
print(ex)
finally:
drone.down(50)
sleep(5)
drone.land()
sleep(5)
#drone.subscribe(drone.EVENT_FLIGHT_DATA, handler)
drone.quit()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
こちらの記事は進展があり次第更新していきます。