USB to CAN モジュールをROS2で利用したCAN通信の実装例
本記事では、USB to CAN モジュール(Amazonで購入可能)を使用して、Ubuntu 22.04環境下でROS2を利用しCAN通信を実現する方法を紹介します.
システム環境と前提条件
- OS: Ubuntu 22.04
- ROS2: ROS2 Humble Hawksbill(またはご利用のROS2ディストリビューション)
- 言語: Python 3
-
ライブラリ:
- rclpy: ROS2のPythonクライアントライブラリ
- python-can: CAN通信のためのライブラリ
- struct, time: 標準ライブラリ
-
CANアダプタ: USB to CAN モジュール CANable2
購入した商品はこちら → Amazonリンク
導入はこちらを参考にしてください.
以下のPythonプログラムは、ROS2ノードとして動作し、USB to CAN モジュール経由でCANバス上のデータ送受信を行います.
-
CANバスの初期化:
SocketCANインターフェース(ここではcan0
)を使用してCANバスに接続. -
ROS2パブリッシャーとサブスクライバーの設定:
-
robot_position
トピックへ、CANバスから受信した座標や角度情報をFloat32MultiArray
形式でパブリッシュ. -
send_can_message
トピックから受信したデータをCANメッセージに変換し、送信.
-
-
CANメッセージの送受信:
-
送信: サブスクライバーのコールバックで受け取ったデータ(速度や角速度、指示番号)をパッキングして、2種類のCANメッセージ(アービトレーションID
0x160
と0x161
)として送信. - 受信: タイマーによって1ミリ秒ごとにCANバスをポーリングし、受信したメッセージの内容に応じてデータをアンパッキング、ROS2トピックにパブリッシュ.
-
送信: サブスクライバーのコールバックで受け取ったデータ(速度や角速度、指示番号)をパッキングして、2種類のCANメッセージ(アービトレーションID
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32MultiArray
import can
import struct
import time
class CANNode(Node):
def __init__(self):
super().__init__('can_node')
# CANバスの設定
try:
self.bus = can.Bus(interface='socketcan', channel='can0')
except OSError as e:
self.get_logger().error(f"Could not access SocketCAN device can0: {e}")
rclpy.shutdown()
return
self.publisher_ = self.create_publisher(Float32MultiArray, 'robot_position', 10)
self.subscription = self.create_subscription(
Float32MultiArray,
'send_can_message',
self.send_can_message_callback,
10
)
self.timer = self.create_timer(0.001, self.timer_callback) # 1msに一回
self.action_number = 0
self.last_msg_time = self.get_clock().now()
def send_can_message_callback(self, msg):
self.get_logger().info(f"send_can_message_callback called with data: {msg.data}")
self.last_msg_time = self.get_clock().now()
if len(msg.data) == 4:
vx = float(msg.data[0]) # Vx
vy = float(msg.data[1]) # Vy
omega = float(msg.data[2]) # ω
action_number = int(msg.data[3]) # 指示番号
# action_numberを0から10の範囲に制限
action_number = max(0, min(10,action_number))
# CANデータのパッキング
data_160 = struct.pack('>hhh', int(vx), int(vy), int(omega))
can_msg_160 = can.Message(arbitration_id=0x160, data=data_160, is_extended_id=False)
try:
self.bus.send(can_msg_160) # CANバスに送信
self.get_logger().info(f"送信[0x160: {data_160.hex()}] 送信[{vx}, {vy}, {omega}]")
except can.CanError:
self.get_logger().error("Failed to send CAN message 0x160")
data_161 = struct.pack('>B', action_number)
can_msg_161 = can.Message(arbitration_id=0x161, data=data_161, is_extended_id=False)
try:
self.bus.send(can_msg_161) # CANバスに送信
self.get_logger().info(f"送信[0x161: {data_161.hex()}] 送信[Action Number: {action_number}]")
except can.CanError:
self.get_logger().error("Failed to send CAN message 0x161")
def timer_callback(self):
msg = self.bus.recv(timeout=0.001) # タイムアウトを短く設定
if msg is not None:
if msg.arbitration_id == 0x150:
self.get_logger().debug(f"Received CAN message: {msg}")
try:
# CANデータのアンパッキング
x = struct.unpack('>h', msg.data[0:2])[0]
y = struct.unpack('>h', msg.data[2:4])[0]
theta = struct.unpack('>h', msg.data[4:6])[0]
command_msg = Float32MultiArray()
command_msg.data = [float(x), float(y), float(theta), float(self.action_number)]
self.publisher_.publish(command_msg)
#self.get_logger().info(f"受信[0x150: {msg.data.hex()}] 受信[{x}, {y}, {theta}, {self.action_number}]")
except struct.error as e:
self.get_logger().error(f"Unpacking error: {e}")
elif msg.arbitration_id == 0x151:
self.get_logger().debug(f"Received CAN message: {msg}")
try:
self.action_number = struct.unpack('>B', msg.data)[0]
#self.get_logger().info(f"受信[0x151: {msg.data.hex()}] 受信[Action Number: {self.action_number}]")
except struct.error as e:
self.get_logger().error(f"Unpacking error: {e}")
elif msg.arbitration_id == 0x360:
self.get_logger().debug(f"Received CAN message: {msg}")
try:
# CANデータのアンパッキング
x = struct.unpack('>h', msg.data[0:2])[0]
y = struct.unpack('>h', msg.data[2:4])[0]
theta = struct.unpack('>h', msg.data[4:6])[0]
command_msg = Float32MultiArray()
command_msg.data = [float(x), float(y), float(theta), float(self.action_number)]
self.publisher_.publish(command_msg)
#self.get_logger().info(f"受信[0x360: {msg.data.hex()}] 受信[{x}, {y}, {theta}, {self.action_number}]")
except struct.error as e:
self.get_logger().error(f"Unpacking error: {e}")
def main(args=None):
rclpy.init(args=args)
node = CANNode()
if node.bus:
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
プログラム詳細解説
1. CANバスの初期化と接続
try:
self.bus = can.Bus(interface='socketcan', channel='can0')
except OSError as e:
self.get_logger().error(f"Could not access SocketCAN device can0: {e}")
rclpy.shutdown()
return
-
SocketCANの利用:
can.Bus
を利用して、SocketCANインターフェース(can0
)に接続します.
2. ROS2ノードとしての設定
-
パブリッシャー:
self.publisher_ = self.create_publisher(Float32MultiArray, 'robot_position', 10)
CANバスから受信した座標情報(x, y, θ)とアクション番号を、
robot_position
トピックにパブリッシュします. -
サブスクライバー:
self.subscription = self.create_subscription( Float32MultiArray, 'send_can_message', self.send_can_message_callback, 10 )
別のノードから
send_can_message
トピック経由で受信したデータをもとに、CANメッセージとして送信する処理を行います. -
タイマーによる定期処理:
self.timer = self.create_timer(0.001, self.timer_callback) # 1msごとに実行
1ミリ秒でタイマーが動作し、CANバスからの受信を定期的にチェックします.
3. CANメッセージの送信処理
サブスクライバーのコールバックsend_can_message_callback
では、受信したデータのうち、速度(Vx, Vy)、角速度(ω)、および指示番号を取り出し、以下のようにCANメッセージとして送信します.
-
データのパッキング:
data_160 = struct.pack('>hhh', int(vx), int(vy), int(omega))
3つの値を大きいエンディアンでそれぞれ16ビットの整数としてパッキング.
同様に、指示番号は1バイトとしてパッキングされます. -
CANメッセージの送信:
送信するCANメッセージは、アービトレーションID0x160
と0x161
を持ち、各々の送信結果についてログを出力することで、動作の確認が容易になっています.
4. CANメッセージの受信処理
タイマーコールバックtimer_callback
では、CANバスからの受信をポーリングし、以下のアービトレーションIDに応じた処理を行います.
-
ID
0x150
および0x360
:
これらのメッセージは、2バイトずつの値(x, y, θ)を含むと仮定し、struct.unpack
を使ってアンパッキングした後、robot_position
トピックへパブリッシュします. -
ID
0x151
:
ここでは、アクション番号を更新するためのメッセージとして処理され、今後の送信データに反映されます.