3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DonkeyCarのマイシミュレータを作ろう DonkeyCar実装編

Posted at

はじめに

環境構築編の続きです。
シミュレーション上でDonkeyCarを走らせて学習、自動運転まで行います。

実装方針

以下がDonkeyCarの構造の概略図です。図のように、カメラ映像の取得部分と制御信号を送る部分に修正を加えます。
そのためにROS2で通信を行うためのros2.pyパーツを追加し、manage.pyへ変更を加えて動作するようにします。
image.png

作業の流れ

1.DonkeyCarのインストール
2.ROS2Partsの追加
3.Manage.pyの修正
4.動作確認
5.学習データ取得
6.学習

DonkeyCarのインストール

DonkeyCar公式ページに従ってインストールします。
動作確認バージョンは、 ですが、アーキテクチャ構造が変わらなければ、どのバージョンでも対応できると思います。

セットアップの準備を行います。

sudo apt update
sudo apt upgrade
sudo apt install python3-pip
sudo apt install libmtdev1 xclip
mkdir projects
cd projects
git clone https://github.com/autorope/donkeycar
cd donkeycar
git checkout 4.3.6.2

GPUを使わない場合

pip install tensorflow==2.5.0
pip install -e .[pc]
pip install protobuf==3.20.1
pip install numpy== 1.20.3
pip install torch torchvision torchaudio
pip install pytorch_lightning
pip install fastai

※GPUを使う場合は、以下を参考にセットアップを行ってください。
https://zenn.dev/ylabo0717/articles/48796b7f3470c7

DonkeyCarのテンプレートを作成します。

donkey createcar --path ~/mycar
cd ~/mycar

※インストール後でパスが通っていない場合は、別のターミナルを開いて実行してください。

動作確認のために、myconfig.pyを修正します。お好みのエディタで作業を行ってください。

nano myconfig.py

動作確認のために以下の設定を変更し、カメラタイプとドライブタイプをいったん無効化します。

CAMERA_TYPE = "MOCK"   # (PICAM|WEBCAM|CVCAM|CSIC|V4L|D435|MOCK|IMAGE_LIST)
DRIVE_TRAIN_TYPE = "MOCK"

manage.pyを起動して動作を確認します。

python3 manage.py dirve
________             ______                   _________
___  __ \_______________  /___________  __    __  ____/_____ ________
__  / / /  __ \_  __ \_  //_/  _ \_  / / /    _  /    _  __ `/_  ___/
_  /_/ // /_/ /  / / /  ,<  /  __/  /_/ /     / /___  / /_/ /_  /
/_____/ \____//_/ /_//_/|_| \___/_\__, /      \____/  \__,_/ /_/
                                 /____/

using donkey v4.3.6 ...
WARNING:donkeycar.parts.pins:RPi.GPIO was not imported.
WARNING:donkeycar.parts.pins:pigpio was not imported.
loading config file: /home/ubuntu/mycar/config.py
loading personal config over-rides from myconfig.py
INFO:__main__:PID: 11076
cfg.CAMERA_TYPE MOCK
INFO:__main__:cfg.CAMERA_TYPE MOCK
INFO:donkeycar.vehicle:Adding part MockCamera.
Starting Donkey Server...... you can now go to DESKTOP-RQQVNCB.local:8887 to drive your car.
INFO:donkeycar.vehicle:Adding part LocalWebController.
INFO:donkeycar.vehicle:Adding part ThrottleFilter.
INFO:donkeycar.vehicle:Adding part PilotCondition.
INFO:donkeycar.vehicle:Adding part RecordTracker.
INFO:donkeycar.vehicle:Adding part DriveMode.
INFO:donkeycar.vehicle:Adding part AiLaunch.
INFO:donkeycar.vehicle:Adding part AiRunCondition.
Using catalog /home/ubuntu/mycar/data/catalog_0.catalog
INFO:donkeycar.vehicle:Adding part TubWriter.
You can now go to <your hostname.local>:8887 to drive your car.
INFO:donkeycar.vehicle:Starting vehicle at 20 Hz

localhost:8887にアクセスしてWebUIが起動していることを確認します。
image.png

2.ROS2Partsの追加

~/projects/donkeycar/donkeycar/partsにros2.pyを作成します。
アクセルとステアリングをUnity側に送信するRosTwistPublisherクラスとUnityからのカメラ映像を受信するRos2Subscriberクラスを用意します。

from time import sleep
import rclpy
from std_msgs.msg import String, Int32, Float32
from geometry_msgs.msg import Twist
from sensor_msgs.msg import Image 
from cv_bridge import CvBridge
import cv2

class RosTwistPublisher(object):
    def __init__(self,node_name,topic_name,args=None,stream_type=Twist, anonymous=True):
        self.node = rclpy.create_node(node_name)
        self.publisher = self.node.create_publisher(stream_type,'cmd_vel',10)
        self.twist = Twist()

    def run(self, x,z):
        self.twist.linear.x = float(x)
        self.twist.angular.z = float(z)

        self.publisher.publish(self.twist)


class Ros2Subscriber(object):
    def listener_callback(self, msg):
        bridge = CvBridge()
        img = bridge.imgmsg_to_cv2(msg, "bgr8")
        img = cv2.resize(img, dsize=(160, 120))
        self.frame = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)

    def cmd_listener_callback(self, Twist):
        self.linear = Twist.linear.x
        self.angular = Twist.angular.z

    def __init__(self,args=None):
        self.linear = 0.0
        self.angular = 0.0
        self.frame = None
        bridge = CvBridge()
        rclpy.init(args=args)
        self.node = rclpy.create_node('minimal_subscriber')
        subscription = self.node.create_subscription(
            Image, 'image_raw', self.listener_callback, 10)
        subscription  # prevent unused variable warning

        subscription2 = self.node.create_subscription(Twist,'unity_cmd_vel', self.cmd_listener_callback,5)
        subscription2  # prevent unused variable warning

    def run(self):
        return self.frame,self.linear,self.angular

    def run_threaded(self):
        return self.frame,self.linear,self.angular

    def update(self):
        rclpy.spin(self.node)
        subscriber.destroy_node()
        rclpy.shutdown()

3.manage.pyの修正

まず、Unity側から送られてくるカメラ映像を受け取る部分を作ります。
以下のコードのうち、#ROS2 Subscribe部分を追加します。

        inputs = []
        outputs = ['cam/image_array']
        threaded = True
        if cfg.DONKEY_GYM:

            from donkeycar.parts.dgym import DonkeyGymEnv 
            #rbx
            cam = DonkeyGymEnv(cfg.DONKEY_SIM_PATH, host=cfg.SIM_HOST, env_name=cfg.DONKEY_GYM_ENV_NAME, conf=cfg.GYM_CONF, record_location=cfg.SIM_RECORD_LOCATION, record_gyroaccel=cfg.SIM_RECORD_GYROACCEL, record_velocity=cfg.SIM_RECORD_VELOCITY, record_lidar=cfg.SIM_RECORD_LIDAR, delay=cfg.SIM_ARTIFICIAL_LATENCY)
            threaded = True
            inputs = ['angle', 'throttle']
        elif cfg.CAMERA_TYPE == "PICAM":
            from donkeycar.parts.camera import PiCamera
            cam = PiCamera(image_w=cfg.IMAGE_W, image_h=cfg.IMAGE_H, image_d=cfg.IMAGE_DEPTH, framerate=cfg.CAMERA_FRAMERATE, vflip=cfg.CAMERA_VFLIP, hflip=cfg.CAMERA_HFLIP)
        elif cfg.CAMERA_TYPE == "WEBCAM":
            from donkeycar.parts.camera import Webcam
            cam = Webcam(image_w=cfg.IMAGE_W, image_h=cfg.IMAGE_H, image_d=cfg.IMAGE_DEPTH, camera_index=cfg.CAMERA_INDEX)
        elif cfg.CAMERA_TYPE == "CVCAM":    from donkeycar.parts.cv import CvCam
            cam = CvCam(image_w=cfg.IMAGE_W, image_h=cfg.IMAGE_H, image_d=cfg.IMAGE_DEPTH, iCam=cfg.CAMERA_INDEX)
        elif cfg.CAMERA_TYPE == "CSIC":
            from donkeycar.parts.camera import CSICamera
            cam = CSICamera(image_w=cfg.IMAGE_W, image_h=cfg.IMAGE_H, image_d=cfg.IMAGE_DEPTH, framerate=cfg.CAMERA_FRAMERATE, 
                            capture_width=cfg.IMAGE_W, capture_height=cfg.IMAGE_H, gstreamer_flip=cfg.CSIC_CAM_GSTREAMER_FLIP_PARM)
        elif cfg.CAMERA_TYPE == "V4L":
            from donkeycar.parts.camera import V4LCamera
            cam = V4LCamera(image_w=cfg.IMAGE_W, image_h=cfg.IMAGE_H, image_d=cfg.IMAGE_DEPTH, framerate=cfg.CAMERA_FRAMERATE)
        elif cfg.CAMERA_TYPE == "MOCK":
            from donkeycar.parts.camera import MockCamera
            cam = MockCamera(image_w=cfg.IMAGE_W, image_h=cfg.IMAGE_H, image_d=cfg.IMAGE_DEPTH)
        elif cfg.CAMERA_TYPE == "IMAGE_LIST":
            from donkeycar.parts.camera import ImageListCamera
            cam = ImageListCamera(path_mask=cfg.PATH_MASK)
        elif cfg.CAMERA_TYPE == "LEOPARD":
            from donkeycar.parts.leopard_imaging import LICamera
            cam = LICamera(width=cfg.IMAGE_W, height=cfg.IMAGE_H, fps=cfg.CAMERA_FRAMERATE)
        #ROS2 Subscribe
        elif cfg.CAMERA_TYPE == "ROS2CAM":
            from donkeycar.parts.ros2 import Ros2Subscriber
            cam = Ros2Subscriber()
            outputs += ['unity/throttle','unity/angle']
        else:
            raise(Exception("Unkown camera type: %s" % cfg.CAMERA_TYPE))

次に、Unity側に制御信号を送信する部分を追加します。
以下のコードのうち、#ROS2 Publish部分を追加します。

    elif cfg.DRIVE_TRAIN_TYPE == "VESC":
        from donkeycar.parts.actuator import VESC
        logger.info("Creating VESC at port {}".format(cfg.VESC_SERIAL_PORT))
        vesc = VESC(cfg.VESC_SERIAL_PORT,
                      cfg.VESC_MAX_SPEED_PERCENT,
                      cfg.VESC_HAS_SENSOR,
                      cfg.VESC_START_HEARTBEAT,
                      cfg.VESC_BAUDRATE, 
                      cfg.VESC_TIMEOUT,
                      cfg.VESC_STEERING_SCALE,
                      cfg.VESC_STEERING_OFFSET
                    )
        V.add(vesc, inputs=['angle', 'throttle'])

    #ROS2 Publish
    elif cfg.DRIVE_TRAIN_TYPE == "ROS2_DRIVE":
        from donkeycar.parts.ros2 import RosTwistPubisher
        ctr = RosTwistPubisher(node_name='speed',topic_name='cmd_vel')
        V.add(ctr,inputs=['throttle','angle'])

修正に合わせてmyconfig.pyを修正します。
以下のそれぞれのオプションを変更しましょう。

CAMERA_TYPE = "ROS2CAM" 
DRIVE_TRAIN_TYPE = "ROS2_DRIVE"

4.動作確認

シミュレータ環境を起動した後に、以下のコマンドでDonkeyCarを起動します。

python3 manage.py drive

image.png
Ros2SubscriberとROSTwistPublisherが起動していることが確認できます。
http://localhost:8887/drive
にアクセスしてWebUIを起動しましょう。シミュレータ映像が映し出されていれば成功です!
image.png
Joystickを操作するとマシンが動くことを確認できます。これにより、Unityから映像、DonkeyCarからアクセルとステアリングの制御信号を送ることが出来ました。

前進することによって/data/imagesに画像データが保存も出来ています。

5.学習データ取得

学習用のデータを取得しましょう。Joystickだと操作しにくいと思いますので、ゲームコントローラを使うことをお勧めします。

ゲームコントローラによる操作

USBゲームコントローラをPCに接続してWebUI画面上のGamepadボタンをクリックしてコントロールモードを切り替えます。そうすると、コントローラによる操作が行えます。
image.png
ゲームコントローラを有効にするには、WebUIもWindow上に表示させる必要があるので、Unity画面とブラウザ画面を表示するようにしましょう。
image.png

とりあえずこの状態で周回して学習データを集めましょう。約10~15周ぐらいが目安です。
マシンをスタート位置に戻りたい場合は、Unityの停止→再生ボタンをクリックしてください。

6.学習

学習は、通常通りのコマンドで行います。ここから先は、通常のDonkeyCarと同じはずです。

cd mycar
python3 train.py --tub data/ --model models/mypilot.h5

7.自動運転

学習できたモデルを活用して自動運転を行いましょう。

python manage.py drive --model ./models/mypilot.h5

うまく学習ができていると以下のように自動で走り出します。

最後に

最後まで読んでいただきありがとうございます!
この考え方をベースにすることでJetracerもシミュレーションが作れます!
シミュレーションライフを楽しみましょう!

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?