環境
| 機器・ソフト | バージョン |
|---|---|
| ノートPC | Windows10 Home |
| Python | 3.7.0 |
| opencv-python | 3.4.2.17 |
自作モジュール解説
自作モジュールはGitHubにおいてあります。
各種セットアップ
セットアップはsetup_tello()で行っています。__init__()でもよかったのですが……
SDKコマンドやりとり
TelloへのSDKコマンドの送信またそれに対する応答は113行目にあるgo_tello()で制御しています。
まずgo_tello()に引数でコマンドを受け取りコマンドをsock.sendto()で送ります。その後154行目のwhileループに突入し、self.RECIEVE_DATAがNoneの間すなわち、ドローンから応答が返って来るまで処理待ち状態にします。ドローンからの応答は、
実行成功: OK
実行失敗: error (エラー文が続く場合がある)
のような形式で返ってきます。
ドローンの応答が返ってきたら、クールダウンを行います。TELLO-EDUはコマンドとコマンドの間を短くするとerror not joystickなるエラーが返ってきます。なのでthreading.Timer()を用いて処理待ち状態にします。ちなみにtime.sleep()だとなぜかうまくいきませんでした。
def __receive_thread__(self):
"""
Telloからの応答監視スレッド
最後の応答がreceive_dataに格納される
"""
while self._receive_going:
try:
self.RECEIVE_DATA, ip = self.sock.recvfrom(3000)
except:
print(traceback.format_exc())
def go_tello(self, S, interval=1.0):
"""
S : コマンド(大文字小文字どちらでもいい)
interval : 入力コマンド間の遅延
command
takeoff
land
up[down, left, right, forward, back] "Value" : "Value" = 20-500
cw[ccw] "Value" : "Value" = 1-360
flip "Direction" : "Direction" = l(left), r(right), f(forward), b(back)
"""
# command, emergency は処理上ない
_CONTROL_COMMANDS_ = ["takeoff", "land", "streamon", "streamoff", "up", "down", "left",
"right", "forward", "back", "cw", "ccw", "flip", "go", "stop",
"curve", "jump"]
_SET_COMMANDS_ = ["speed", "rc", "wifi",
"mon", "moff", "mdirection", "ap"]
_READ_COMMANDS_ = ["speed?", "battery?", "time?", "wifi?", "height?", "temp?", "attitude?"
"baro?", "acceleration?", "tof?"]
_ALL_COMMANDS_ = _CONTROL_COMMANDS_ + _SET_COMMANDS_ + _READ_COMMANDS_
if isinstance(S, str):
S = S.lower()
self.DATA_FLAG = False
sent = self.sock.sendto(S.encode("utf-8"),
self.tello_address) # コマンド送信
print(S, ">> Tello")
# タイマースレッドで処理待ち状態に
# time.sleepとかだとうまくいかない
timer = threading.Timer(interval, self.set_flag_TRUE)
timer.start() # タイマースレッドスタート
S = S.split()
S = S[0]
while self.RECEIVE_DATA is None:
if S in _ALL_COMMANDS_:
continue
if self.DATA_FLAG:
break
timer.cancel() # タイマースレッドストップ
if self.RECEIVE_DATA is None:
response = "NULL"
else:
response = self.RECEIVE_DATA.decode("utf-8")
self.RECEIVE_DATA = None
print(response + " << Tello")
else:
raise TypeError
self.DATA_FLAG = False
timer = threading.Timer(interval, self.set_flag_TRUE)
timer.start() # タイマースレッドスタート
while True:
if self.DATA_FLAG:
break
カメラ制御
Telloのカメラ制御にはpythonのopencv-pythonモジュールを使用しました。(バージョンは最新のものではない)
カメラ制御の挙動としては、Telloが他のSDKコマンドを実行している最中でもPCでTelloの映像をリアルタイムで表示できるようにしました。再びthreadingを使用します。
VIDEO_FRAMEにフレームデータを格納し、それをcv2.imshow()で表示しています。cv2.waitKey()はキー入力を引数に与えた時間(単位はミリ秒)だけ待つ関数で一見必要ないように見えますが、これがないとcv2.imshow()ができないので入れています。
def __video_thread__(self):
"""
ビデオスレッド
カメラのフレームデータをself.VIDEO_FRAMEに格納する
"""
self.VIDEO_FRAME = None
self.rect = []
while self._video_going:
ret, self.VIDEO_FRAME = self.cap.read()
if len(self.rect) == 1: # 顔が検出された場合、四角で囲む
x, y, w, h = self.rect[0]
cv2.rectangle(self.VIDEO_FRAME, tuple([x, y]), tuple([x + w, y + h]), (0, 0, 255), thickness=3)
cv2.imshow("frame_data", self.VIDEO_FRAME)
_ = cv2.waitKey(1)
#cv2.destroyAllWindows()
またtake_picture()という写真撮影を行う関数も作成しました。引数filenameは撮影した画像データを保存するときのファイル名で、特に指定がなければframe_filenameが適当にナンバリングしてくれます。
def take_picture(self, filename=None):
"""
Telloのカメラの最新フレームを保存する
保存先はself.frame_save_path
保存名はself.frame_filename
"""
if filename is None: # ユーザがファイル名を指定しなかった場合ファイル名をナンバリング
filename = self.frame_filename
self.frame_filename = str(int(self.frame_filename))
self.frame_filename = str(eval(self.frame_filename + " + 1"))
self.frame_filename = self.frame_filename.zfill(6) # 整形のためゼロ埋め
filename = self.frame_filename
cv2.imwrite(self.frame_save_path + filename + ".jpg", self.VIDEO_FRAME)
print(f"Saved the photo as filename:{filename} << Tello")
顔認識
顔認識にはOpenCVに付属していたカスケード分類器を使用しました。付属の分類器の顔認識の(大雑把な)精度は以下の通りでした。
| 分類器 | 精度 |
|---|---|
| haarcascade_frontalcatface | 良くなかった |
| haarcascade_frontalcatface_extended | 良くなかった |
| haarcascade_frontalface_alt | 多少認識する |
| haarcascade_frontalface_alt_tree | 良くなかった |
| haarcascade_frontalface_alt2 | 良かった |
| haarcascade_frontalface_default | 良かった |
pytello.pyにある通り私が実装した際にはhaarcascade_frontalface_default.xmlを使用しました(alt2でもいい感じかも)。
顔認識は294行目のface_detection()にあります。スレッドで走らせている__video_thread__()のVIDEO_FRAMEからopencvのフレームデータを読み取り、グレースケールにしてHaarCascadeで殴っています。
def face_detection(self):
"""
現在のフレームデータ(self.VIDEO_FRAME)にて人の顔があるかどうか識別
肝心な顔認識の部分はHaar Cascadeで殴ってる
返し値は認識した顔の[(左上のx座標), (左上のy座標), (横幅), (縦線)]が、
認識した顔の数だけlistで返される
"""
self.rect = []
cv2.imwrite("./_camera_frame/temp.jpg", self.VIDEO_FRAME) # いったん空打
src = cv2.imread("./_camera_frame/temp.jpg", 0)
gray = cv2.cvtColor(src, cv2.cv2.COLOR_BAYER_BG2GRAY)
cascade = cv2.CascadeClassifier(self.cascade_path)
self.rect = cascade.detectMultiScale(gray, minSize=(100, 100))
return self.rect
色認識
色認識はHSV閾値を用いて行いました。
color_detection()に引数upper, lowerとして閾値を入力します。引数を受け取った関数はVIDEO_FRAMEからフレームデータを取得しcv2.cvtColor()でHSVに変換して、指定色範囲以外をマスクします。マスク結果はfind_specific_color()に引数として渡されます。
find_specific_color()は特定の色の座標を返すプログラムで、TonyMoooriさんのGiuHubから使用しました。
def color_detection(self, lower, upper):
"""
閾値に対して色検出を行いその座標を返す。
lower, upperにそれぞれ[h, s, v]を指定してぶち込む。
"""
cv2.imwrite("./_camera_frame/temp.jpg", self.VIDEO_FRAME)
img = cv2.imread("./_camera_frame/temp.jpg")
# HSVに変換
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# numpyのarrayじゃなきゃだめじゃね?
lower = np.array(lower)
upper = np.array(upper)
# 指定色範囲以外をマスク
img_mask = cv2.inRange(hsv, lower, upper)
res = cv2.bitwise_and(img, img, mask=img_mask)
cv2.imwrite("./_camera_frame/teemp.jpg", res) # マスクした結果を保存
# 認識対象の色のx座標, y座標, 面積を返す
return self.find_specific_color(img, 0, lower, upper)
def find_specific_color(self, frame, AREA_RATIO_THRESHOLD, LOW_COLOR, HIGH_COLOR): # https://gist.github.com/TonyMooori/4cc29c94f7bdbade6ff6102fef45232e
"""
指定した範囲の色の物体の座標を取得する関数
frame: 画像
AREA_RATIO_THRESHOLD: area_ratio未満の塊は無視する
LOW_COLOR: 抽出する色の下限(h,s,v)
HIGH_COLOR: 抽出する色の上限(h,s,v)
"""
h, w, c = frame.shape
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
ex_img = cv2.inRange(hsv, LOW_COLOR, HIGH_COLOR)
# 面積を計算
_, contours, hierarchy = cv2.findContours(
ex_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
areas = np.array(list(map(cv2.contourArea, contours)))
if len(areas) == 0 or np.max(areas) / (h * w) * 10000 < AREA_RATIO_THRESHOLD:
# 見つからなかったらNoneを返す
return None, None, None
else:
# 面積が最大の塊の重心を計算し返す
max_idx = np.argmax(areas)
max_area = areas[max_idx]
result = cv2.moments(contours[max_idx])
x = int(result["m10"]/result["m00"])
y = int(result["m01"]/result["m00"])
return [x, y, np.max(areas) / (h * w) * 10000]