今回は,ROS2でIMUセンサを使いたいと思い,WitMotionのBWT901CLをセンサノードにする.
はじめに
環境
今回の環境は以下の通り.
- Ubuntu:20.04 on Docker
- ROS2:Foxy
- IMUセンサ:BWT901CL(WitMotion)
- 既にROS2をインストールし,チュートリアルを動かしている
先人の知恵
この記事を記載している現在,公式からはPythonコードを提供していなかったので作る.ただし,既にPythonコードを書かれている方がいたので感謝しながら使わせてもらう.
参考にしたコードには2点の問題点があった.
- ROS2ノードがスリープしたときのUARTの同期が崩れる対策がされていない
- UART通信で受け取った2Byteバイナリデータを変換するときに,符号付きのオプションがなかったため,正しい値が読み込めない.
これら2点の問題に対策したコードを作った.
今回のコード
仕事の終わりのスキマ時間で作ったので,テキトーな点は目をつぶってほしい.
コードの解説
センサノードを作る目的のほかに,ROS2パッケージとノードとメッセージの3点を作る勉強も兼ねている.
ファイル構成
ros2_ws
└ src
└ bwt901cl_pkg
├ src
│ ├ __init__.py
│ └ bwt901cl.py
├ __init__.py
├ imu_bwt901cl.py
├ test
├ resource
├ └ bwt901cl_pkg
├ package.xml
├ setup.cfg
└ setup.py
ここで,UART通信を行うPythonモジュールはbwt901cl.py
であり,ROS2のPublisherノードはimu_bwt901cl.py
である.
UART通信を行うPythonモジュール
from serial import Serial
from time import sleep
class BWT901CL(Serial):
def __init__(self, Port):
self.myserial = super().__init__(Port, baudrate = 115200, timeout = 1)
self.angular_velocity_x = 0.0
self.angular_velocity_y = 0.0
self.angular_velocity_z = 0.0
self.angle_x = 0.0
self.angle_y = 0.0
self.angle_z = 0.0
self.accel_x = 0.0
self.accel_y = 0.0
self.accel_z = 0.0
self.Temp = 0.0
self.magnetic_x = 0.0
self.magnetic_y = 0.0
self.magnetic_z = 0.0
self.quaternion_x = 0.0
self.quaternion_y = 0.0
self.quaternion_z = 0.0
self.quaternion_w = 1.0
self.YY = 0
self.MM = 0
self.DD = 0
self.hh = 0
self.mm = 0
self.ss = 0
self.ms = 0
self.D0 = 0
self.D1 = 0
self.D2 = 0
self.D3 = 0
while True:
data = super(BWT901CL, self).read(size=1)
if data == b'\x55': # UARTのスタートBITを探索
data = super(BWT901CL, self).read(size=10)
print("success!")
print(bytes(data))
break
print("trying", data)
#print(super(BWT901CL, self).isOpen())
self._readData()
def _readData(self):
try:
for i in range(6):
data = super(BWT901CL, self).read(size=11)
if not len(data) == 11:
print('byte error:', len(data))
break
if not data[0] == 0x55: # スタートBITに異常がある場合読み込み中止
print('UART sync error:', bytes(data))
break
#Time
if data[1] == 0x50:
self.YY = data[2]
self.MM = data[3]
self.DD = data[4]
self.hh = data[5]
self.mm = data[6]
self.ss = data[7]
self.ms = int.from_bytes(data[8:10], byteorder='little')
#Acceleration
elif data[1] == 0x51:
# signed=Trueオプションを忘れると,符号なしで認識されてバグることがある.
self.accel_x = int.from_bytes(data[2:4], byteorder='little', signed=True)/32768.0*16.0*9.8
self.accel_y = int.from_bytes(data[4:6], byteorder='little', signed=True)/32768.0*16.0*9.8
self.accel_z = int.from_bytes(data[6:8], byteorder='little', signed=True)/32768.0*16.0*9.8
self.Temp = int.from_bytes(data[8:10], byteorder='little', signed=True)/340.0+36.25
#Angular velocity
elif data[1] == 0x52:
self.angular_velocity_x = int.from_bytes(data[2:4], byteorder='little', signed=True)/32768*2000.0
self.angular_velocity_y = int.from_bytes(data[4:6], byteorder='little', signed=True)/32768*2000.0
self.angular_velocity_z = int.from_bytes(data[6:8], byteorder='little', signed=True)/32768*2000.0
self.Temp = int.from_bytes(data[8:10], byteorder='little')/340.0+36.25
#Angle
elif data[1] == 0x53:
self.angle_x = int.from_bytes(data[2:4], byteorder='little', signed=True)/32768*180
self.angle_y = int.from_bytes(data[4:6], byteorder='little', signed=True)/32768*180
self.angle_z = int.from_bytes(data[6:8], byteorder='little', signed=True)/32768*180
#Magnetic
elif data[1] == 0x54:
self.magnetic_x = int.from_bytes(data[2:4], byteorder='little', signed=True)
self.magnetic_y = int.from_bytes(data[4:6], byteorder='little', signed=True)
self.magnetic_z = int.from_bytes(data[6:8], byteorder='little', signed=True)
#Quatrernion
elif data[1] == 0x59:
self.quaternion_x = int.from_bytes(data[2:4], byteorder='little', signed=True)/32768
self.quaternion_y = int.from_bytes(data[4:6], byteorder='little', signed=True)/32768
self.quaternion_z = int.from_bytes(data[6:8], byteorder='little', signed=True)/32768
self.quaternion_w = int.from_bytes(data[8:10], byteorder='little', signed=True)/32768
#Data output port status
elif data[1] == 0x55:
self.D0 = int.from_bytes(data[2:4], byteorder='little',signed=True)
self.D1 = int.from_bytes(data[4:6], byteorder='little',signed=True)
self.D2 = int.from_bytes(data[6:8], byteorder='little',signed=True)
self.D3 = int.from_bytes(data[8:10], byteorder='little',signed=True)
except KeyboardInterrupt:
super(BWT901CL, self).close()
def getAngle(self):
self._readData()
return (self.angle_x, self.angle_y, self.angle_z)
def getAnglurVelocity(self):
self._readData()
return (self.angular_velocity_x, self.angular_velocity_y, self.angular_velocity_z)
def getAccel(self):
self._readData()
return (self.accel_x, self.accel_y, self.accel_z)
def getTemperature(self):
return self.Temp
def getData(self):
super(BWT901CL, self).reset_input_buffer()
is_not_sync = True
while is_not_sync:
data = super(BWT901CL, self).read(size=1)
if data == b'\x55': # UARTの同期エラーをリカバリ
data = super(BWT901CL, self).read(size=10)
is_not_sync = False
break
self._readData()
return (self.angle_x, self.angle_y, self.angle_z), \
(self.angular_velocity_x, self.angular_velocity_y, self.angular_velocity_z), \
(self.accel_x, self.accel_y, self.accel_z), \
self.Temp, \
(self.magnetic_x, self.magnetic_y, self.magnetic_z), \
(self.quaternion_x, self.quaternion_y, self.quaternion_z, self.quaternion_w), \
(self.MM, self.DD, self.hh, self.mm, self.ss, self.ms)
if __name__ == "__main__":
jy_sensor = BWT901CL("/dev/ttyUSB0")
#sleep(1)
#jy_sensor.wake_up()
while True:
#print('Angle: ', jy_sensor.getAngle())
#print('Anglular velocity: ', jy_sensor.getAnglurVelocity())
sleep(0.5)
jy_sensor.getData()
ROS2のPublisherノード
ノードの構成
以下に示すノード1つだけを作った.
- imu_bwt901cl
メッセージの構成
公式の共通メッセージのGithubを参考に,センサのメッセージを4つ作った.
- /sensor/bwt901cl/Imu(Imu)
- /sensor/bwt901cl/MagneticField(MagneticField)
- /sensor/bwt901cl/Temperature(Temperature)
- /sensor/bwt901cl/Angle(Vector3)
書いたコード
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu, Temperature, MagneticField
from geometry_msgs.msg import Vector3
import subprocess
from .src.bwt901cl import BWT901CL
class Imu901cl(Node):
def __init__(self, time_interval=1.0):
super().__init__('imu_bwt901cl')
self.pub_imu = self.create_publisher(Imu, '/sensor/bwt901cl/Imu', 10)
self.pub_mag = self.create_publisher(MagneticField, '/sensor/bwt901cl/MagneticField', 10)
self.pub_tmp = self.create_publisher(Temperature, '/sensor/bwt901cl/Temperature', 10)
self.pub_ang = self.create_publisher(Vector3, '/sensor/bwt901cl/Angle', 10)
self.tmr = self.create_timer(time_interval, self.timer_callback)
subprocess.call("sudo chmod 777 /dev/ttyUSB0", shell=True)
self.imu_sensor = BWT901CL("/dev/ttyUSB0")
def timer_callback(self):
msg_imu = Imu()
msg_mag = MagneticField()
msg_tmp = Temperature()
msg_ang = Vector3()
angle, angular_velocity, accel, temp, magnetic, quaternion, time = self.imu_sensor.getData()
msg_tmp.temperature = temp
self.pub_tmp.publish(msg_tmp)
msg_mag.magnetic_field.x = float(magnetic[0])
msg_mag.magnetic_field.y = float(magnetic[1])
msg_mag.magnetic_field.z = float(magnetic[2])
self.pub_mag.publish(msg_mag)
msg_imu.orientation.x = quaternion[0]
msg_imu.orientation.y = quaternion[1]
msg_imu.orientation.z = quaternion[2]
msg_imu.orientation.w = quaternion[3]
msg_imu.angular_velocity.x = angular_velocity[0]
msg_imu.angular_velocity.y = angular_velocity[1]
msg_imu.angular_velocity.z = angular_velocity[2]
msg_imu.linear_acceleration.x = accel[0]
msg_imu.linear_acceleration.y = accel[1]
msg_imu.linear_acceleration.z = accel[2]
self.pub_imu.publish(msg_imu)
msg_ang.x = angle[0]
msg_ang.y = angle[1]
msg_ang.z = angle[2]
self.pub_ang.publish(msg_ang)
#print("Time:", time)
#print("th:", angle)
#print("d_th: ", angular_velocity)
#print("d_x: ", accel)
#print("mag: ", magnetic)
#print("tmp: ", temp)
#print(quaternion)
def main(args=None):
print('Hi from bwt901cl_pkg.')
rclpy.init(args=args)
node_imu_bwt901cl = Imu901cl(time_interval=0.1)
rclpy.spin(node_imu_bwt901cl)
node_imu_bwt901cl.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
自分で作成したPythonモジュールを登録
py_modules=[]
を使って,作成したモジュールbwt901cl.py
を登録し忘れると,colcon build
時にinstall
フォルダにモジュールがコピーされず,正常に動作しない点に注意する.
from setuptools import setup
package_name = 'bwt901cl_pkg'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='dorebom',
maintainer_email='dorebom.b@gmail.com',
description='Package for using sensor module BWT901CL',
license='MIT License',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'imu_bwt901cl = bwt901cl_pkg.imu_bwt901cl:main'
],
},
py_modules=["bwt901cl_pkg.src.bwt901cl"]
)
今後の課題
現在のコードは読み込みが一定周期ではない.また時間があるときに改善を行うつもりである.
参考にしたページ