Telloをpythonで動かす。
公式のTello-pythonを参考にすれば割と簡単にpythonで制御プログラムをかけそうだったが,カメラ映像を受信するのにハマったのでメモ。
リポジトリではh264decoderをビルドして使えと書いているが,自分のmac環境ではいろいろ試したがうまくビルドできなかった。
特にPython3環境ではビルドに修正が必要になる模様。
(補足)Python3で「Tello_Video」を動かす
トイドローン Tello をMacで操作&カメラ映像をOpenCVで受信してみる
上記の記事ではストリームを整形したあとopencvでデコードしている。
直接ストリームをデコードするとノイズだらけになると記事にあるが、試しにそのままやってみると自分の環境では普通にデコードできた。しかしその後、いろいろh264decoderのビルドを試すときOSやxcodeのアップデートをしているうちにopencvではデコードできなくなってしまった。
原因がよくわからなかったが、最終的にopencv-pythonだけではだめで,opencv-contrib-pythonをインストールすればデコードできることがわかった。
(h264のデコードに権利的な何かが絡む?)
他の環境でもうまくいくのか不明だが,h264decoderのビルドがうまく行かない場合は試してみてもいいかも。
python3で動かしたいならこの方法が一番ラクと思われる。
Tello-pythonのTello_Video/tello.pyをopencvバージョンに修正したのが以下。
import socket
import threading
import time
import numpy as np
import cv2
class Tello:
"""Wrapper class to interact with the Tello drone with opencv."""
def __init__(self, local_ip='', local_port=8889, imperial=False, command_timeout=.3, tello_ip='192.168.10.1',
tello_port=8889):
"""
Binds to the local IP/port and puts the Tello into command mode.
:param local_ip (str): Local IP address to bind.
:param local_port (int): Local port to bind.
:param imperial (bool): If True, speed is MPH and distance is feet.
If False, speed is KPH and distance is meters.
:param command_timeout (int|float): Number of seconds to wait for a response to a command.
:param tello_ip (str): Tello IP.
:param tello_port (int): Tello port.
"""
self.abort_flag = False
self.command_timeout = command_timeout
self.imperial = imperial
self.response = None
self.frame = None # numpy array BGR -- current camera output frame
self.is_freeze = False # freeze current camera output
self.last_frame = None
self.socket = socket.socket(
socket.AF_INET, socket.SOCK_DGRAM) # socket for sending cmd
self.tello_address = (tello_ip, tello_port)
self.last_height = 0
self.socket.bind((local_ip, local_port))
# thread for receiving cmd ack
self.receive_thread = threading.Thread(target=self._receive_thread)
self.receive_thread.daemon = True
self.receive_thread.start()
# to receive video -- send cmd: command, streamon
self.socket.sendto(b'command', self.tello_address)
print('sent: command')
self.socket.sendto(b'streamon', self.tello_address)
print('sent: streamon')
# for opencv
self.VS_UDP_IP = '0.0.0.0'
self.VS_UDP_PORT = 11111
self.udp_video_address = 'udp://@' + self.VS_UDP_IP + ':' + str(self.VS_UDP_PORT)
self.cap = cv2.VideoCapture(self.udp_video_address)
# thread for receiving video
self.receive_video_thread = threading.Thread(
target=self._receive_video_thread)
self.receive_video_thread.daemon = True
self.receive_video_thread.start()
def __del__(self):
"""Closes the local socket."""
self.socket.close()
self.cap.release()
def read(self):
"""Return the last frame from camera."""
if self.is_freeze:
return self.last_frame
else:
return self.frame
def video_freeze(self, is_freeze=True):
"""Pause video output -- set is_freeze to True"""
self.is_freeze = is_freeze
if is_freeze:
self.last_frame = self.frame
def _receive_thread(self):
"""Listen to responses from the Tello.
Runs as a thread, sets self.response to whatever the Tello last returned.
"""
while True:
try:
self.response, ip = self.socket.recvfrom(3000)
# print(self.response)
except socket.error as exc:
print("Caught exception socket.error : %s" % exc)
def _receive_video_thread(self):
"""
Listens for video streaming (raw h264) from the Tello with opencv.
Runs as a thread, sets self.frame to the most recent frame Tello captured.
"""
while True:
ret, self.frame = self.cap.read()
def send_command(self, command):
"""
Send a command to the Tello and wait for a response.
:param command: Command to send.
:return (str): Response from Tello.
"""
print(">> send cmd: {}".format(command))
self.abort_flag = False
timer = threading.Timer(self.command_timeout, self.set_abort_flag)
self.socket.sendto(command.encode('utf-8'), self.tello_address)
timer.start()
while self.response is None:
if self.abort_flag is True:
break
timer.cancel()
if self.response is None:
response = 'none_response'
else:
response = self.response.decode('utf-8')
self.response = None
return response
def set_abort_flag(self):
"""
Sets self.abort_flag to True.
Used by the timer in Tello.send_command() to indicate to that a response
timeout has occurred.
"""
self.abort_flag = True
def takeoff(self):
"""
Initiates take-off.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('takeoff')
def set_speed(self, speed):
"""
Sets speed.
This method expects KPH or MPH. The Tello API expects speeds from
1 to 100 centimeters/second.
Metric: .1 to 3.6 KPH
Imperial: .1 to 2.2 MPH
Args:
speed (int|float): Speed.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
speed = float(speed)
if self.imperial is True:
speed = int(round(speed * 44.704))
else:
speed = int(round(speed * 27.7778))
return self.send_command('speed %s' % speed)
def rotate_cw(self, degrees):
"""
Rotates clockwise.
Args:
degrees (int): Degrees to rotate, 1 to 360.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('cw %s' % degrees)
def rotate_ccw(self, degrees):
"""
Rotates counter-clockwise.
Args:
degrees (int): Degrees to rotate, 1 to 360.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('ccw %s' % degrees)
def flip(self, direction):
"""
Flips.
Args:
direction (str): Direction to flip, 'l', 'r', 'f', 'b'.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('flip %s' % direction)
def get_response(self):
"""
Returns response of tello.
Returns:
int: response of tello.
"""
response = self.response
return response
def get_height(self):
"""Returns height(dm) of tello.
Returns:
int: Height(dm) of tello.
"""
height = self.send_command('height?')
height = str(height)
height = filter(str.isdigit, height)
try:
height = int(height)
self.last_height = height
except:
height = self.last_height
pass
return height
def get_battery(self):
"""Returns percent battery life remaining.
Returns:
int: Percent battery life remaining.
"""
battery = self.send_command('battery?')
try:
battery = int(battery)
except:
pass
return battery
def get_flight_time(self):
"""Returns the number of seconds elapsed during flight.
Returns:
int: Seconds elapsed during flight.
"""
flight_time = self.send_command('time?')
try:
flight_time = int(flight_time)
except:
pass
return flight_time
def get_speed(self):
"""Returns the current speed.
Returns:
int: Current speed in KPH or MPH.
"""
speed = self.send_command('speed?')
try:
speed = float(speed)
if self.imperial is True:
speed = round((speed / 44.704), 1)
else:
speed = round((speed / 27.7778), 1)
except:
pass
return speed
def land(self):
"""Initiates landing.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('land')
def move(self, direction, distance):
"""Moves in a direction for a distance.
This method expects meters or feet. The Tello API expects distances
from 20 to 500 centimeters.
Metric: .02 to 5 meters
Imperial: .7 to 16.4 feet
Args:
direction (str): Direction to move, 'forward', 'back', 'right' or 'left'.
distance (int|float): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
distance = float(distance)
if self.imperial is True:
distance = int(round(distance * 30.48))
else:
distance = int(round(distance * 100))
return self.send_command('%s %s' % (direction, distance))
def move_backward(self, distance):
"""Moves backward for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('back', distance)
def move_down(self, distance):
"""Moves down for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('down', distance)
def move_forward(self, distance):
"""Moves forward for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('forward', distance)
def move_left(self, distance):
"""Moves left for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('left', distance)
def move_right(self, distance):
"""Moves right for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
"""
return self.move('right', distance)
def move_up(self, distance):
"""Moves up for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('up', distance)