以前の記事の続きです。
(draftですが一応公開します。)
概要
Robomasters1をPythonでハックします。
- アプリで書いたPythonプログラムがどのように実行されているのか探る
- UDPで通信をさせてみる
- Robomasterのアプリから画像を拾ってくる
- 画像を元にコントロール(次回)
前提
- ルート化or内部ファイルへのアクセス(前回の記事を参照)
Python in Robomaster S1
Robomasters1のPython環境はPython3.6.6です。
binは/data/python_scripts
以下にあり,root化したコマンドラインからも実行できます。
アプリ上で書いたファイルの場所と形式
アプリ上で編集したPythonファイルは/data/scripts/files
にあります。
アプリ上とは名前が違うのでls -l
とかで更新日時とかで頑張って追うしかなさそうです。
前回書いたroot化のプログラムに当たる部分を開いてみると,諸機能でwrapされていることがわかります。
クリックして展開
import event_client
import rm_ctrl
import rm_define
import math
import traceback
import rm_log
logger = rm_log.dji_scratch_logger_get()
event = event_client.EventClient()
modulesStatus_ctrl = rm_ctrl.ModulesStatusCtrl(event)
gun_ctrl = rm_ctrl.GunCtrl(event)
armor_ctrl = rm_ctrl.ArmorCtrl(event)
vision_ctrl = rm_ctrl.VisionCtrl(event)
chassis_ctrl = rm_ctrl.ChassisCtrl(event)
gimbal_ctrl = rm_ctrl.GimbalCtrl(event)
robot_ctrl = rm_ctrl.RobotCtrl(event, chassis_ctrl, gimbal_ctrl)
log_ctrl = rm_ctrl.LogCtrl(event)
# scratch mode only
led_ctrl = rm_ctrl.LedCtrl(event)
media_ctrl = rm_ctrl.MediaCtrl(event)
# need replaced when app changed the method name
time = rm_ctrl.RobotTools(event)
tools = rm_ctrl.RobotTools(event)
debug_ctrl = rm_ctrl.DebugCtrl(event)
mobile_ctrl = rm_ctrl.MobileCtrl(event)
blaster_ctrl = gun_ctrl
AI_ctrl = vision_ctrl
show_msg = log_ctrl.show_msg
print_msg = log_ctrl.print_msg
info_msg = log_ctrl.info_msg
debug_msg = log_ctrl.debug_msg
error_msg = log_ctrl.error_msg
fatal_msg = log_ctrl.fatal_msg
print=print_msg
robot_mode = rm_define.robot_mode
chassis_status = rm_define.chassis_status
gimbal_status = rm_define.gimbal_status
detection_type = rm_define.detection_type
detection_func = rm_define.detection_func
led_effect = rm_define.led_effect
led_position = rm_define.led_position
pwm_port = rm_define.pwm_port
line_color = rm_define.line_color
def robot_reset():
robot_ctrl.set_mode(rm_define.robot_mode_free)
gimbal_ctrl.resume()
gimbal_ctrl.recenter(90)
def robot_init():
if 'speed_limit_mode' in globals():
chassis_ctrl.enable_speed_limit_mode()
robot_ctrl.init()
modulesStatus_ctrl.init()
gimbal_ctrl.init()
chassis_ctrl.init()
led_ctrl.init()
gun_ctrl.init()
chassis_ctrl.init()
mobile_ctrl.init()
tools.init()
robot_reset()
def ready():
robot_init()
robot_ctrl.set_mode(rm_define.robot_mode_gimbal_follow)
tools.program_timer_start()
def register_event():
armor_ctrl.register_event(globals())
vision_ctrl.register_event(globals())
media_ctrl.register_event(globals())
chassis_ctrl.register_event(globals())
def start():
pass
def stop():
event.script_state.set_script_has_stopped()
block_description_push(id="ABCDEFGHIJ4567890123", name="STOP", type="INFO_PUSH", curvar="")
def robot_exit():
robot_reset()
robot_ctrl.exit()
gimbal_ctrl.exit()
chassis_ctrl.exit()
gun_ctrl.exit()
mobile_ctrl.exit()
armor_ctrl.exit()
media_ctrl.exit()
try:
ready()
# replace your python code here
def root_me(module):
__import__=rm_log.__dict__['__builtins__']['__import__']
return __import__(module,globals(),locals(),[],0)
builtins=root_me('builtins')
subprocess=root_me('subprocess')
proc=subprocess.Popen('/system/bin/adb_en.sh',shell=True,executable='/system/bin/sh',stdout=subprocess.PIPE,stderr=subprocess.PIPE)
register_event()
start()
stop()
except:
_error_msg = traceback.format_exc()
logger.error('MAIN: script exit, message: ')
logger.error('TRACEBACK:\n' + _error_msg)
finally:
gun_ctrl.stop()
chassis_ctrl.stop()
gimbal_ctrl.stop()
media_ctrl.stop()
vision_ctrl.stop()
armor_ctrl.stop()
robot_exit()
event.stop()
del event
アプリ上でのPythonプログラム実行の流れ
超ざっくり書くと,プログラムの流れは
- ユーザがアプリのグローバルスコープで書いたコード
- イベント予約,初期位置
- ユーザーがstart()関数に書いた処理
- 後片付け(含ホームポジション戻り)
となるはずです。
関数制限,import制限の突破
robomasters1の環境では標準のPython+DJIの関数があるので色々できるかと思いますが,実際にアプリ内でsocketやsubprocessを走らせようとすると弾かれてしまいます。
アプリでの実行時,/data/dji_scratch/libscript_manage.py
というコードが実行されており,それによりホワイトリストにない関数やimportはエラーを出すようになっています。
具体的には以下のようになってます。
# need to add safe modules name
safe_module_names = [
'event_client',
'rm_ctrl',
'rm_define',
'rm_block_description',
'rm_log',
'tools',
'time',
'math',
'random',
'threading',
'traceback',
'tracemalloc',
]
def _hook_import(name, *args, **kwargs):
if name in safe_module_names:
return __import__(name, *args, **kwargs)
else:
raise RuntimeError('invalid module, the module is ' + str(name))
_builtins = {'__import__':_hook_import}
従って,このホワイトリストを更新します。
自分が追加したのは以下の関数です。
- モジュール: sys,socket,subprocess,contextlib
- 関数: min,max
一応ファイルを置いておきます。変更したscript_manager.py
変更の後,adbを使ってこれを送りつけます。
.\adb.exe push <ローカルパス>\script_manage.py /data/dji_scratch/lib
さて,これで一応Python側でコードを実行する準備ができました。
Video Capture
PC側からの制御のためになんとかRobomasterS1のVideo出力をCaptureしたいです。
配布SDKさえ対応してくれれば,特定のポートをListenするだけで良さそうなのですが,サポートへの問い合わせ待ちです。
(できたら教えて下さい。)
仕方がないので妥協策としてここでは,rosmasterのアプリの画面をキャプチャします。
流れは,
- OBS Studioを管理者権限で起動。(なぜならばRobomasterのアプリが管理者権限で起動するから)
- シーンを追加,ソースから「ゲームキャプチャを追加」(下図参照)
- 「ツール」→「Virtual Cam」からキャプチャした画像をBroadcast
となります。
OpenCVのVideoCaptureの番号を適当に探せば適切なStreamを見つけられるはずです。
実際の動作
Robomasters1のカメラに映る赤い物体をトラッキングしました。
手順としては
- コード実行画面からコード実行
- コード実行画面の右からcamera画像を表示させて全画面化
- Alt+Tabで画面切り替えしてトラッキングプログラムを起動
という工程で実行します。
RobomasterからPCへの通信と,OBSのキャプチャ,そのエクスポートと3段階のラグ要因を挟んではいますが,それなりに動作します。
自分の過去に作ったTrackingパッケージ使いにくすぎて笑った。
— 柏のトトロ (@ossyaritoori) August 13, 2020
とりまVtuber作戦でビデオを取得。#robomasters1 pic.twitter.com/MNkCGfUKmj
PC側とUDP通信(後で)
socketを用いて書きます。
TCPも試しましたが,接続ロストしたりエラーはいた時の後処理がだるいのでUDPだけ載せます。
IP,ポート確認
この辺の調査は体当たりでもいいですが,adbのshellが開けているなら比較的簡単に調べられます。
IPは
ifconfig wlan0
を実行します。
netstat
コマンドで使われているポートを確認しておきましょう。(本当はSDKモードが有効になっているなら5桁チャンネルがあるはずですが今後のアップデート次第です。)
1|root@xw607_dz_ap0002_v4:/ # netstat
Proto Recv-Q Send-Q Local Address Foreign Address State
tcp 0 0 0.0.0.0:8905 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:8906 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:8907 0.0.0.0:* LISTEN
tcp 0 0 127.0.0.1:5037 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:8909 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:8910 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:8912 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:8913 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:8916 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:21 0.0.0.0:* LISTEN
udp 0 0 0.0.0.0:67 0.0.0.0:* CLOSE
udp 0 0 0.0.0.0:67 0.0.0.0:* CLOSE
udp 0 0 0.0.0.0:67 0.0.0.0:* CLOSE
udp 0 0 0.0.0.0:10607 0.0.0.0:* CLOSE
udp 0 0 0.0.0.0:58000 0.0.0.0:* CLOSE
udp 0 0 0.0.0.0:35476 0.0.0.0:* CLOSE
udp 0 0 0.0.0.0:56789 0.0.0.0:* CLOSE
ソケットプログラミング
UDPのほうが後始末が楽だったのでUDPでやります。
クライアント(PC)
マルチスレッドでやると画像処理などのメインの処理の負荷が多少下がるのでおすすめです。
import socket
import threading
class ClientThread(threading.Thread):
def __init__(self, PORT=88888,HOST="192.168.100.111"):
threading.Thread.__init__(self)
self.kill_flag = False
# line information
self.HOST = HOST
self.PORT = PORT
self.BUFSIZE = 1024
self.ADDR = (HOST, self.PORT)
# tcp/udp
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.udpsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def run(self):
while True:
try:
global msg
# tcp
#self.sock.connect(self.ADDR)
#self.sock.send(msg.encode())
# udp
self.udpsock.sendto(msg.encode(), self.ADDR)
time.sleep(0.001)
print("send")
# udp recv
data, addr = self.udpsock.recvfrom(self.BUFSIZE)
print(data)
except:
pass
self.udpsock.close()
#print("close")
self.udpsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
th = ClientThread(PORT=port,HOST=host)
th.setDaemon(True)
th.start()
サーバ(Robomasters1)
Classを実装しようとした所,やはり怒られたのでこっちはマルチスレッドは諦めました。
import socket
def run_server():
# parameter
host = '192.168.100.111'
port = 88888
backlog = 5
buf_size = 1024
timeout = 20
#init TCP
#sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#sock.settimeout(timeout)
udpServSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udpServSock.bind((host,port)) # HOST, PORTでbinding
udpServSock.settimeout(timeout)
while True:
#udp
msg, cliaddr = udpServSock.recvfrom(buf_size) # データ受信
# 何某かの処理
data = "hoge"
udpServSock.sendto(data.encode(),cliaddr)
return
ロボットのコントロール(後で)
Pythonでロボットを動作させるときの簡単な関数の説明は以下にあります。
ジンバル制御:位置制御と速度制御
- 右向きがYawの正,上向きがPitchの正
- 操作モードのうち,gimbal_followモードかfreeモードが良さそう
以下のコードで動作を確認できると思います。
- 位置制御確認プログラム
# Gimbal Lead
robot_ctrl.set_mode(rm_define.robot_mode_chassis_follow)
# Set chasis to follow Gimbal
chassis_ctrl.set_follow_gimbal_offset(0)
chassis_ctrl.set_rotate_speed(180)
gimbal_ctrl.set_rotate_speed(100)
# yaw positive is right side
gimbal_ctrl.yaw_ctrl(60)
gimbal_ctrl.yaw_ctrl(-60)
gimbal_ctrl.yaw_ctrl(0)
- 速度制御確認プログラム
# Gimbal Lead
robot_ctrl.set_mode(rm_define.robot_mode_chassis_follow)
# Set chasis to follow Gimbal
chassis_ctrl.set_follow_gimbal_offset(0)
chassis_ctrl.set_rotate_speed(180)
gimbal_ctrl.set_rotate_speed(100)#無意味
gimbal_ctrl.rotate_with_speed(30, 10)
# 速度のOpenループは許されなかったorz
gimbal_ctrl.angle_ctrl(60, 15)
ビジュアルトラッキング(次回)
絶賛動作部分のバグに悩まされ中。今回の要素の組み合わせになります。
- PCで画像を取得
- 位置指令を作成
- socketで通信