LoginSignup
8
3

More than 3 years have passed since last update.

IMUセンサのROS2ノードを作成

Posted at

今回は,ROS2でIMUセンサを使いたいと思い,WitMotionのBWT901CLをセンサノードにする.

はじめに

環境

今回の環境は以下の通り.

  • Ubuntu:20.04 on Docker
  • ROS2:Foxy
  • IMUセンサ:BWT901CL(WitMotion)
  • 既にROS2をインストールし,チュートリアルを動かしている

先人の知恵

この記事を記載している現在,公式からはPythonコードを提供していなかったので作る.ただし,既にPythonコードを書かれている方がいたので感謝しながら使わせてもらう.

参考にしたコードには2点の問題点があった.

  1. ROS2ノードがスリープしたときのUARTの同期が崩れる対策がされていない
  2. UART通信で受け取った2Byteバイナリデータを変換するときに,符号付きのオプションがなかったため,正しい値が読み込めない.

これら2点の問題に対策したコードを作った.

今回のコード

仕事の終わりのスキマ時間で作ったので,テキトーな点は目をつぶってほしい.

コードの解説

センサノードを作る目的のほかに,ROS2パッケージとノードとメッセージの3点を作る勉強も兼ねている.

ファイル構成

ros2_ws
 └ src
    └ bwt901cl_pkg
       ├ src
       │  ├ __init__.py
       │  └ bwt901cl.py
       ├ __init__.py
       ├ imu_bwt901cl.py
       ├ test
       ├ resource
       ├  └ bwt901cl_pkg
       ├ package.xml
       ├ setup.cfg
       └ setup.py

ここで,UART通信を行うPythonモジュールはbwt901cl.pyであり,ROS2のPublisherノードはimu_bwt901cl.pyである.

UART通信を行うPythonモジュール

bwt901cl.py
from serial import Serial
from time import sleep

class BWT901CL(Serial):
    def __init__(self, Port):
        self.myserial = super().__init__(Port, baudrate = 115200, timeout = 1)

        self.angular_velocity_x = 0.0
        self.angular_velocity_y = 0.0
        self.angular_velocity_z = 0.0
        self.angle_x = 0.0
        self.angle_y = 0.0
        self.angle_z = 0.0
        self.accel_x = 0.0
        self.accel_y = 0.0
        self.accel_z = 0.0
        self.Temp = 0.0
        self.magnetic_x = 0.0
        self.magnetic_y = 0.0
        self.magnetic_z = 0.0
        self.quaternion_x = 0.0
        self.quaternion_y = 0.0
        self.quaternion_z = 0.0
        self.quaternion_w = 1.0
        self.YY = 0
        self.MM = 0
        self.DD = 0
        self.hh = 0
        self.mm = 0
        self.ss = 0
        self.ms = 0
        self.D0 = 0
        self.D1 = 0
        self.D2 = 0
        self.D3 = 0

        while True:
            data = super(BWT901CL, self).read(size=1)
            if data == b'\x55':  # UARTのスタートBITを探索
                data = super(BWT901CL, self).read(size=10)
                print("success!")
                print(bytes(data))
                break
            print("trying", data)
        #print(super(BWT901CL, self).isOpen())
        self._readData()

    def _readData(self):
        try:
            for i in range(6):
                data = super(BWT901CL, self).read(size=11)

                if not len(data) == 11:
                    print('byte error:', len(data))
                    break
                if not data[0] == 0x55: # スタートBITに異常がある場合読み込み中止
                    print('UART sync error:', bytes(data))
                    break
               #Time
                if data[1] == 0x50:
                    self.YY = data[2]
                    self.MM = data[3]
                    self.DD = data[4]
                    self.hh = data[5]
                    self.mm = data[6]
                    self.ss = data[7]
                    self.ms = int.from_bytes(data[8:10], byteorder='little')

                #Acceleration
                elif data[1] == 0x51:
                    # signed=Trueオプションを忘れると,符号なしで認識されてバグることがある.
                    self.accel_x = int.from_bytes(data[2:4], byteorder='little', signed=True)/32768.0*16.0*9.8
                    self.accel_y = int.from_bytes(data[4:6], byteorder='little', signed=True)/32768.0*16.0*9.8
                    self.accel_z = int.from_bytes(data[6:8], byteorder='little', signed=True)/32768.0*16.0*9.8
                    self.Temp = int.from_bytes(data[8:10], byteorder='little', signed=True)/340.0+36.25

                #Angular velocity
                elif data[1] == 0x52:
                    self.angular_velocity_x = int.from_bytes(data[2:4], byteorder='little', signed=True)/32768*2000.0
                    self.angular_velocity_y = int.from_bytes(data[4:6], byteorder='little', signed=True)/32768*2000.0
                    self.angular_velocity_z = int.from_bytes(data[6:8], byteorder='little', signed=True)/32768*2000.0
                    self.Temp = int.from_bytes(data[8:10], byteorder='little')/340.0+36.25

                #Angle
                elif data[1] == 0x53:
                    self.angle_x = int.from_bytes(data[2:4], byteorder='little', signed=True)/32768*180
                    self.angle_y = int.from_bytes(data[4:6], byteorder='little', signed=True)/32768*180
                    self.angle_z = int.from_bytes(data[6:8], byteorder='little', signed=True)/32768*180

                #Magnetic
                elif data[1] == 0x54:
                    self.magnetic_x = int.from_bytes(data[2:4], byteorder='little', signed=True)
                    self.magnetic_y = int.from_bytes(data[4:6], byteorder='little', signed=True)
                    self.magnetic_z = int.from_bytes(data[6:8], byteorder='little', signed=True)

                #Quatrernion
                elif data[1] == 0x59:
                    self.quaternion_x = int.from_bytes(data[2:4], byteorder='little', signed=True)/32768
                    self.quaternion_y = int.from_bytes(data[4:6], byteorder='little', signed=True)/32768
                    self.quaternion_z = int.from_bytes(data[6:8], byteorder='little', signed=True)/32768
                    self.quaternion_w = int.from_bytes(data[8:10], byteorder='little', signed=True)/32768

                #Data output port status
                elif data[1] == 0x55:
                    self.D0 = int.from_bytes(data[2:4], byteorder='little',signed=True)
                    self.D1 = int.from_bytes(data[4:6], byteorder='little',signed=True)
                    self.D2 = int.from_bytes(data[6:8], byteorder='little',signed=True)
                    self.D3 = int.from_bytes(data[8:10], byteorder='little',signed=True)

        except KeyboardInterrupt:
            super(BWT901CL, self).close()

    def getAngle(self):
        self._readData()
        return (self.angle_x, self.angle_y, self.angle_z)

    def getAnglurVelocity(self):
        self._readData()
        return (self.angular_velocity_x, self.angular_velocity_y, self.angular_velocity_z)

    def getAccel(self):
        self._readData()
        return (self.accel_x, self.accel_y, self.accel_z)

    def getTemperature(self):
        return self.Temp

    def getData(self):
        super(BWT901CL, self).reset_input_buffer()
        is_not_sync = True
        while is_not_sync:
            data = super(BWT901CL, self).read(size=1)
            if data == b'\x55': # UARTの同期エラーをリカバリ
                data = super(BWT901CL, self).read(size=10)
                is_not_sync = False
                break

        self._readData()
        return  (self.angle_x, self.angle_y, self.angle_z), \
                (self.angular_velocity_x, self.angular_velocity_y, self.angular_velocity_z), \
                (self.accel_x, self.accel_y, self.accel_z), \
                self.Temp, \
                (self.magnetic_x, self.magnetic_y, self.magnetic_z), \
                (self.quaternion_x, self.quaternion_y, self.quaternion_z, self.quaternion_w), \
                (self.MM, self.DD, self.hh, self.mm, self.ss, self.ms)

if __name__ == "__main__":

    jy_sensor =  BWT901CL("/dev/ttyUSB0")
    #sleep(1)
    #jy_sensor.wake_up()
    while True:
        #print('Angle: ', jy_sensor.getAngle())
        #print('Anglular velocity: ', jy_sensor.getAnglurVelocity())
        sleep(0.5)
        jy_sensor.getData()

ROS2のPublisherノード

ノードの構成

以下に示すノード1つだけを作った.
1. imu_bwt901cl

メッセージの構成

公式の共通メッセージのGithubを参考に,センサのメッセージを4つ作った.
1. /sensor/bwt901cl/Imu(Imu)
2. /sensor/bwt901cl/MagneticField(MagneticField)
3. /sensor/bwt901cl/Temperature(Temperature)
4. /sensor/bwt901cl/Angle(Vector3)

書いたコード

imu_bwt901cl.py

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu, Temperature, MagneticField
from geometry_msgs.msg import Vector3
import subprocess

from .src.bwt901cl import BWT901CL

class Imu901cl(Node):
    def __init__(self, time_interval=1.0):
        super().__init__('imu_bwt901cl')
        self.pub_imu = self.create_publisher(Imu, '/sensor/bwt901cl/Imu', 10)
        self.pub_mag = self.create_publisher(MagneticField, '/sensor/bwt901cl/MagneticField', 10)
        self.pub_tmp = self.create_publisher(Temperature, '/sensor/bwt901cl/Temperature', 10)
        self.pub_ang = self.create_publisher(Vector3, '/sensor/bwt901cl/Angle', 10)
        self.tmr = self.create_timer(time_interval, self.timer_callback)
        subprocess.call("sudo chmod 777 /dev/ttyUSB0", shell=True)
        self.imu_sensor =  BWT901CL("/dev/ttyUSB0")

    def timer_callback(self):
        msg_imu = Imu()
        msg_mag = MagneticField()
        msg_tmp = Temperature()
        msg_ang = Vector3()

        angle, angular_velocity, accel, temp, magnetic, quaternion, time = self.imu_sensor.getData()

        msg_tmp.temperature = temp
        self.pub_tmp.publish(msg_tmp)

        msg_mag.magnetic_field.x = float(magnetic[0])
        msg_mag.magnetic_field.y = float(magnetic[1])
        msg_mag.magnetic_field.z = float(magnetic[2])
        self.pub_mag.publish(msg_mag)

        msg_imu.orientation.x = quaternion[0]
        msg_imu.orientation.y = quaternion[1]
        msg_imu.orientation.z = quaternion[2]
        msg_imu.orientation.w = quaternion[3]
        msg_imu.angular_velocity.x = angular_velocity[0]
        msg_imu.angular_velocity.y = angular_velocity[1]
        msg_imu.angular_velocity.z = angular_velocity[2]
        msg_imu.linear_acceleration.x = accel[0]
        msg_imu.linear_acceleration.y = accel[1]
        msg_imu.linear_acceleration.z = accel[2]
        self.pub_imu.publish(msg_imu)

        msg_ang.x = angle[0]
        msg_ang.y = angle[1]
        msg_ang.z = angle[2]
        self.pub_ang.publish(msg_ang)

        #print("Time:", time)
        #print("th:", angle)
        #print("d_th: ", angular_velocity)
        #print("d_x: ", accel)
        #print("mag: ", magnetic)
        #print("tmp: ", temp)
        #print(quaternion)

def main(args=None):    
    print('Hi from bwt901cl_pkg.')

    rclpy.init(args=args)
    node_imu_bwt901cl = Imu901cl(time_interval=0.1)
    rclpy.spin(node_imu_bwt901cl)

    node_imu_bwt901cl.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

自分で作成したPythonモジュールを登録

py_modules=[]を使って,作成したモジュールbwt901cl.pyを登録し忘れると,colcon build時にinstallフォルダにモジュールがコピーされず,正常に動作しない点に注意する.

setup.py
from setuptools import setup

package_name = 'bwt901cl_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='dorebom',
    maintainer_email='dorebom.b@gmail.com',
    description='Package for using sensor module BWT901CL',
    license='MIT License',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'imu_bwt901cl = bwt901cl_pkg.imu_bwt901cl:main'
        ],
    },
    py_modules=["bwt901cl_pkg.src.bwt901cl"]
)

今後の課題

現在のコードは読み込みが一定周期ではない.また時間があるときに改善を行うつもりである.

参考にしたページ

8
3
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
3