4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

RobomasterS1をハックしたい ②Python実行環境について

Last updated at Posted at 2020-08-14

以前の記事の続きです。
(draftですが一応公開します。)

概要

Robomasters1をPythonでハックします。

  • アプリで書いたPythonプログラムがどのように実行されているのか探る
  • UDPで通信をさせてみる
  • Robomasterのアプリから画像を拾ってくる
  • 画像を元にコントロール(次回)

前提

  • ルート化or内部ファイルへのアクセス(前回の記事を参照)

Python in Robomaster S1

Robomasters1のPython環境はPython3.6.6です。

binは/data/python_scripts以下にあり,root化したコマンドラインからも実行できます。

アプリ上で書いたファイルの場所と形式

アプリ上で編集したPythonファイルは/data/scripts/filesにあります。

アプリ上とは名前が違うのでls -lとかで更新日時とかで頑張って追うしかなさそうです。

前回書いたroot化のプログラムに当たる部分を開いてみると,諸機能でwrapされていることがわかります。

クリックして展開
import event_client
import rm_ctrl
import rm_define
import math
import traceback
import rm_log

logger = rm_log.dji_scratch_logger_get()
event = event_client.EventClient()
modulesStatus_ctrl = rm_ctrl.ModulesStatusCtrl(event)
gun_ctrl = rm_ctrl.GunCtrl(event)
armor_ctrl = rm_ctrl.ArmorCtrl(event)
vision_ctrl = rm_ctrl.VisionCtrl(event)
chassis_ctrl = rm_ctrl.ChassisCtrl(event)
gimbal_ctrl = rm_ctrl.GimbalCtrl(event)
robot_ctrl = rm_ctrl.RobotCtrl(event, chassis_ctrl, gimbal_ctrl)
log_ctrl = rm_ctrl.LogCtrl(event)
# scratch mode only
led_ctrl = rm_ctrl.LedCtrl(event)
media_ctrl = rm_ctrl.MediaCtrl(event)
# need replaced when app changed the method name
time = rm_ctrl.RobotTools(event)
tools = rm_ctrl.RobotTools(event)
debug_ctrl = rm_ctrl.DebugCtrl(event)
mobile_ctrl = rm_ctrl.MobileCtrl(event)
blaster_ctrl = gun_ctrl
AI_ctrl = vision_ctrl

show_msg = log_ctrl.show_msg
print_msg = log_ctrl.print_msg
info_msg = log_ctrl.info_msg
debug_msg = log_ctrl.debug_msg
error_msg = log_ctrl.error_msg
fatal_msg = log_ctrl.fatal_msg
print=print_msg

robot_mode = rm_define.robot_mode
chassis_status = rm_define.chassis_status
gimbal_status = rm_define.gimbal_status
detection_type = rm_define.detection_type
detection_func = rm_define.detection_func
led_effect = rm_define.led_effect
led_position = rm_define.led_position
pwm_port = rm_define.pwm_port
line_color = rm_define.line_color

def robot_reset():
    robot_ctrl.set_mode(rm_define.robot_mode_free)
    gimbal_ctrl.resume()
    gimbal_ctrl.recenter(90)

def robot_init():
    if 'speed_limit_mode' in globals():
        chassis_ctrl.enable_speed_limit_mode()

    robot_ctrl.init()
    modulesStatus_ctrl.init()
    gimbal_ctrl.init()
    chassis_ctrl.init()
    led_ctrl.init()
    gun_ctrl.init()
    chassis_ctrl.init()
    mobile_ctrl.init()
    tools.init()
    robot_reset()

def ready():
    robot_init()

    robot_ctrl.set_mode(rm_define.robot_mode_gimbal_follow)

    tools.program_timer_start()

def register_event():
    armor_ctrl.register_event(globals())
    vision_ctrl.register_event(globals())
    media_ctrl.register_event(globals())
    chassis_ctrl.register_event(globals())

def start():
    pass

def stop():
    event.script_state.set_script_has_stopped()
    block_description_push(id="ABCDEFGHIJ4567890123", name="STOP", type="INFO_PUSH", curvar="")

def robot_exit():
    robot_reset()
    robot_ctrl.exit()
    gimbal_ctrl.exit()
    chassis_ctrl.exit()
    gun_ctrl.exit()
    mobile_ctrl.exit()
    armor_ctrl.exit()
    media_ctrl.exit()

try:
    ready()

# replace your python code here
    def root_me(module):
     __import__=rm_log.__dict__['__builtins__']['__import__']
     return __import__(module,globals(),locals(),[],0)
    builtins=root_me('builtins')
    subprocess=root_me('subprocess')
    proc=subprocess.Popen('/system/bin/adb_en.sh',shell=True,executable='/system/bin/sh',stdout=subprocess.PIPE,stderr=subprocess.PIPE)


    register_event()
    start()
    stop()
except:
    _error_msg = traceback.format_exc()
    logger.error('MAIN: script exit, message: ')
    logger.error('TRACEBACK:\n' + _error_msg)
finally:
    gun_ctrl.stop()
    chassis_ctrl.stop()
    gimbal_ctrl.stop()
    media_ctrl.stop()
    vision_ctrl.stop()
    armor_ctrl.stop()
    robot_exit()
    event.stop()
    del event

アプリ上でのPythonプログラム実行の流れ

超ざっくり書くと,プログラムの流れは

  • ユーザがアプリのグローバルスコープで書いたコード
  • イベント予約,初期位置
  • ユーザーがstart()関数に書いた処理
  • 後片付け(含ホームポジション戻り)

となるはずです。

関数制限,import制限の突破

robomasters1の環境では標準のPython+DJIの関数があるので色々できるかと思いますが,実際にアプリ内でsocketやsubprocessを走らせようとすると弾かれてしまいます。

SocketError.png

アプリでの実行時,/data/dji_scratch/libscript_manage.pyというコードが実行されており,それによりホワイトリストにない関数やimportはエラーを出すようになっています

具体的には以下のようになってます。

# need to add safe modules name
safe_module_names = [
    'event_client',
    'rm_ctrl',
    'rm_define',
    'rm_block_description',
    'rm_log',
    'tools',
    'time',
    'math',
    'random',
    'threading',
    'traceback',
    'tracemalloc',
]


def _hook_import(name, *args, **kwargs):
    if name in safe_module_names:
        return __import__(name, *args, **kwargs)
    else:
        raise RuntimeError('invalid module, the module is ' + str(name))
_builtins = {'__import__':_hook_import}

従って,このホワイトリストを更新します。
自分が追加したのは以下の関数です。

  • モジュール: sys,socket,subprocess,contextlib
  • 関数: min,max

一応ファイルを置いておきます。変更したscript_manager.py

変更の後,adbを使ってこれを送りつけます。

 .\adb.exe push <ローカルパス>\script_manage.py /data/dji_scratch/lib

さて,これで一応Python側でコードを実行する準備ができました。

Video Capture

PC側からの制御のためになんとかRobomasterS1のVideo出力をCaptureしたいです。
配布SDKさえ対応してくれれば,特定のポートをListenするだけで良さそうなのですが,サポートへの問い合わせ待ちです。
(できたら教えて下さい。)

仕方がないので妥協策としてここでは,rosmasterのアプリの画面をキャプチャします

流れは,

  • OBS Studioを管理者権限で起動。(なぜならばRobomasterのアプリが管理者権限で起動するから)
  • シーンを追加,ソースから「ゲームキャプチャを追加」(下図参照)
  • 「ツール」→「Virtual Cam」からキャプチャした画像をBroadcast

となります。

RobomasCapture.png

OpenCVのVideoCaptureの番号を適当に探せば適切なStreamを見つけられるはずです。

自作チャンネル操作のプログラムはこちら。

実際の動作

Robomasters1のカメラに映る赤い物体をトラッキングしました。
手順としては

  1. コード実行画面からコード実行
  2. コード実行画面の右からcamera画像を表示させて全画面化
  3. Alt+Tabで画面切り替えしてトラッキングプログラムを起動

という工程で実行します。

RobomasterからPCへの通信と,OBSのキャプチャ,そのエクスポートと3段階のラグ要因を挟んではいますが,それなりに動作します。

PC側とUDP通信(後で)

socketを用いて書きます。

TCPも試しましたが,接続ロストしたりエラーはいた時の後処理がだるいのでUDPだけ載せます。

IP,ポート確認

この辺の調査は体当たりでもいいですが,adbのshellが開けているなら比較的簡単に調べられます。

IPは

ifconfig wlan0

を実行します。

netstatコマンドで使われているポートを確認しておきましょう。(本当はSDKモードが有効になっているなら5桁チャンネルがあるはずですが今後のアップデート次第です。)

1|root@xw607_dz_ap0002_v4:/ # netstat
Proto Recv-Q Send-Q Local Address          Foreign Address        State
 tcp       0      0 0.0.0.0:8905           0.0.0.0:*              LISTEN
 tcp       0      0 0.0.0.0:8906           0.0.0.0:*              LISTEN
 tcp       0      0 0.0.0.0:8907           0.0.0.0:*              LISTEN
 tcp       0      0 127.0.0.1:5037         0.0.0.0:*              LISTEN
 tcp       0      0 0.0.0.0:8909           0.0.0.0:*              LISTEN
 tcp       0      0 0.0.0.0:8910           0.0.0.0:*              LISTEN
 tcp       0      0 0.0.0.0:8912           0.0.0.0:*              LISTEN
 tcp       0      0 0.0.0.0:8913           0.0.0.0:*              LISTEN
 tcp       0      0 0.0.0.0:8916           0.0.0.0:*              LISTEN
 tcp       0      0 0.0.0.0:21             0.0.0.0:*              LISTEN
 udp       0      0 0.0.0.0:67             0.0.0.0:*              CLOSE
 udp       0      0 0.0.0.0:67             0.0.0.0:*              CLOSE
 udp       0      0 0.0.0.0:67             0.0.0.0:*              CLOSE
 udp       0      0 0.0.0.0:10607          0.0.0.0:*              CLOSE
 udp       0      0 0.0.0.0:58000          0.0.0.0:*              CLOSE
 udp       0      0 0.0.0.0:35476          0.0.0.0:*              CLOSE
 udp       0      0 0.0.0.0:56789          0.0.0.0:*              CLOSE

ソケットプログラミング

UDPのほうが後始末が楽だったのでUDPでやります。

クライアント(PC)

マルチスレッドでやると画像処理などのメインの処理の負荷が多少下がるのでおすすめです。

import socket
import threading

class ClientThread(threading.Thread):
    def __init__(self, PORT=88888,HOST="192.168.100.111"):
        threading.Thread.__init__(self)
        self.kill_flag = False
        # line information
        self.HOST = HOST
        self.PORT = PORT
        self.BUFSIZE = 1024
        self.ADDR = (HOST, self.PORT)
        # tcp/udp
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.udpsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    def run(self):
        while True:
            try:
                global msg                
                # tcp
                #self.sock.connect(self.ADDR)
                #self.sock.send(msg.encode())
                # udp
                self.udpsock.sendto(msg.encode(), self.ADDR)
                time.sleep(0.001)
                print("send")
                # udp recv
                data, addr = self.udpsock.recvfrom(self.BUFSIZE)
                print(data)
            except:
                pass
                self.udpsock.close()
                #print("close")
                self.udpsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)


th = ClientThread(PORT=port,HOST=host)
th.setDaemon(True)
th.start()

サーバ(Robomasters1)

Classを実装しようとした所,やはり怒られたのでこっちはマルチスレッドは諦めました。

import socket

def run_server():
    # parameter
    host = '192.168.100.111'
    port = 88888
    backlog = 5
    buf_size = 1024
    timeout = 20
    #init TCP
    #sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    #sock.settimeout(timeout)
    udpServSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udpServSock.bind((host,port)) # HOST, PORTでbinding
    udpServSock.settimeout(timeout)
    while True: 
        #udp
        msg, cliaddr = udpServSock.recvfrom(buf_size) # データ受信
        # 何某かの処理
        data = "hoge"
        udpServSock.sendto(data.encode(),cliaddr)
    return

ロボットのコントロール(後で)

Pythonでロボットを動作させるときの簡単な関数の説明は以下にあります。

ジンバル制御:位置制御と速度制御

  • 右向きがYawの正,上向きがPitchの正
  • 操作モードのうち,gimbal_followモードかfreeモードが良さそう

以下のコードで動作を確認できると思います。

  • 位置制御確認プログラム
# Gimbal Lead
robot_ctrl.set_mode(rm_define.robot_mode_chassis_follow)

# Set chasis to follow Gimbal
chassis_ctrl.set_follow_gimbal_offset(0)
chassis_ctrl.set_rotate_speed(180)
gimbal_ctrl.set_rotate_speed(100)

# yaw positive is right side
gimbal_ctrl.yaw_ctrl(60)
gimbal_ctrl.yaw_ctrl(-60)
gimbal_ctrl.yaw_ctrl(0)
  • 速度制御確認プログラム
# Gimbal Lead
robot_ctrl.set_mode(rm_define.robot_mode_chassis_follow)

# Set chasis to follow Gimbal
chassis_ctrl.set_follow_gimbal_offset(0)
chassis_ctrl.set_rotate_speed(180)
gimbal_ctrl.set_rotate_speed(100)#無意味

gimbal_ctrl.rotate_with_speed(30, 10)
# 速度のOpenループは許されなかったorz
gimbal_ctrl.angle_ctrl(60, 15)

ビジュアルトラッキング(次回)

絶賛動作部分のバグに悩まされ中。今回の要素の組み合わせになります。

  • PCで画像を取得
  • 位置指令を作成
  • socketで通信
4
2
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?