2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

USB to CAN モジュールをROS2で

Posted at

USB to CAN モジュールをROS2で利用したCAN通信の実装例

本記事では、USB to CAN モジュール(Amazonで購入可能)を使用して、Ubuntu 22.04環境下でROS2を利用しCAN通信を実現する方法を紹介します.

システム環境と前提条件

  • OS: Ubuntu 22.04
  • ROS2: ROS2 Humble Hawksbill(またはご利用のROS2ディストリビューション)
  • 言語: Python 3
  • ライブラリ:
    • rclpy: ROS2のPythonクライアントライブラリ
    • python-can: CAN通信のためのライブラリ
    • struct, time: 標準ライブラリ
  • CANアダプタ: USB to CAN モジュール CANable2
    購入した商品はこちら → Amazonリンク

導入はこちらを参考にしてください.


以下のPythonプログラムは、ROS2ノードとして動作し、USB to CAN モジュール経由でCANバス上のデータ送受信を行います.

  • CANバスの初期化:
    SocketCANインターフェース(ここではcan0)を使用してCANバスに接続.

  • ROS2パブリッシャーとサブスクライバーの設定:

    • robot_positionトピックへ、CANバスから受信した座標や角度情報をFloat32MultiArray形式でパブリッシュ.
    • send_can_messageトピックから受信したデータをCANメッセージに変換し、送信.
  • CANメッセージの送受信:

    • 送信: サブスクライバーのコールバックで受け取ったデータ(速度や角速度、指示番号)をパッキングして、2種類のCANメッセージ(アービトレーションID 0x1600x161)として送信.
    • 受信: タイマーによって1ミリ秒ごとにCANバスをポーリングし、受信したメッセージの内容に応じてデータをアンパッキング、ROS2トピックにパブリッシュ.
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32MultiArray
import can
import struct
import time

class CANNode(Node):
    def __init__(self):
        super().__init__('can_node')

        # CANバスの設定
        try:
            self.bus = can.Bus(interface='socketcan', channel='can0')
        except OSError as e:
            self.get_logger().error(f"Could not access SocketCAN device can0: {e}")
            rclpy.shutdown()
            return

        self.publisher_ = self.create_publisher(Float32MultiArray, 'robot_position', 10)
        self.subscription = self.create_subscription(
            Float32MultiArray,
            'send_can_message',
            self.send_can_message_callback,
            10
        )
        self.timer = self.create_timer(0.001, self.timer_callback)  # 1msに一回
        self.action_number = 0
        self.last_msg_time = self.get_clock().now()

    def send_can_message_callback(self, msg):
        self.get_logger().info(f"send_can_message_callback called with data: {msg.data}")
        self.last_msg_time = self.get_clock().now()
        if len(msg.data) == 4:
            vx = float(msg.data[0])  # Vx
            vy = float(msg.data[1])  # Vy
            omega = float(msg.data[2])  # ω
            action_number = int(msg.data[3])  # 指示番号

            # action_numberを0から10の範囲に制限
            action_number = max(0, min(10,action_number))

            # CANデータのパッキング
            data_160 = struct.pack('>hhh', int(vx), int(vy), int(omega))
            can_msg_160 = can.Message(arbitration_id=0x160, data=data_160, is_extended_id=False)
            try:
                self.bus.send(can_msg_160)  # CANバスに送信
                self.get_logger().info(f"送信[0x160: {data_160.hex()}] 送信[{vx}, {vy}, {omega}]")
            except can.CanError:
                self.get_logger().error("Failed to send CAN message 0x160")

            data_161 = struct.pack('>B', action_number)
            can_msg_161 = can.Message(arbitration_id=0x161, data=data_161, is_extended_id=False)
            try:
                self.bus.send(can_msg_161)  # CANバスに送信
                self.get_logger().info(f"送信[0x161: {data_161.hex()}] 送信[Action Number: {action_number}]")
            except can.CanError:
                self.get_logger().error("Failed to send CAN message 0x161")

    def timer_callback(self):
        msg = self.bus.recv(timeout=0.001)  # タイムアウトを短く設定
        if msg is not None:
            if msg.arbitration_id == 0x150:
                self.get_logger().debug(f"Received CAN message: {msg}")
                try:
                    # CANデータのアンパッキング
                    x = struct.unpack('>h', msg.data[0:2])[0]
                    y = struct.unpack('>h', msg.data[2:4])[0]
                    theta = struct.unpack('>h', msg.data[4:6])[0]
                    command_msg = Float32MultiArray()
                    command_msg.data = [float(x), float(y), float(theta), float(self.action_number)]
                    self.publisher_.publish(command_msg)
                   #self.get_logger().info(f"受信[0x150: {msg.data.hex()}] 受信[{x}, {y}, {theta}, {self.action_number}]")
                except struct.error as e:
                    self.get_logger().error(f"Unpacking error: {e}")
            elif msg.arbitration_id == 0x151:
                self.get_logger().debug(f"Received CAN message: {msg}")
                try:
                    self.action_number = struct.unpack('>B', msg.data)[0]
                   #self.get_logger().info(f"受信[0x151: {msg.data.hex()}] 受信[Action Number: {self.action_number}]")
                except struct.error as e:
                    self.get_logger().error(f"Unpacking error: {e}")
            elif msg.arbitration_id == 0x360:
                self.get_logger().debug(f"Received CAN message: {msg}")
                try:
                    # CANデータのアンパッキング
                    x = struct.unpack('>h', msg.data[0:2])[0]
                    y = struct.unpack('>h', msg.data[2:4])[0]
                    theta = struct.unpack('>h', msg.data[4:6])[0]
                    command_msg = Float32MultiArray()
                    command_msg.data = [float(x), float(y), float(theta), float(self.action_number)]
                    self.publisher_.publish(command_msg)
                   #self.get_logger().info(f"受信[0x360: {msg.data.hex()}] 受信[{x}, {y}, {theta}, {self.action_number}]")
                except struct.error as e:
                    self.get_logger().error(f"Unpacking error: {e}")

def main(args=None):
    rclpy.init(args=args)
    node = CANNode()
    if node.bus:
        rclpy.spin(node)
        node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

プログラム詳細解説

1. CANバスの初期化と接続

try:
    self.bus = can.Bus(interface='socketcan', channel='can0')
except OSError as e:
    self.get_logger().error(f"Could not access SocketCAN device can0: {e}")
    rclpy.shutdown()
    return
  • SocketCANの利用:
    can.Busを利用して、SocketCANインターフェース(can0)に接続します.

2. ROS2ノードとしての設定

  • パブリッシャー:

    self.publisher_ = self.create_publisher(Float32MultiArray, 'robot_position', 10)
    

    CANバスから受信した座標情報(x, y, θ)とアクション番号を、robot_positionトピックにパブリッシュします.

  • サブスクライバー:

    self.subscription = self.create_subscription(
        Float32MultiArray,
        'send_can_message',
        self.send_can_message_callback,
        10
    )
    

    別のノードからsend_can_messageトピック経由で受信したデータをもとに、CANメッセージとして送信する処理を行います.

  • タイマーによる定期処理:

    self.timer = self.create_timer(0.001, self.timer_callback)  # 1msごとに実行
    

    1ミリ秒でタイマーが動作し、CANバスからの受信を定期的にチェックします.

3. CANメッセージの送信処理

サブスクライバーのコールバックsend_can_message_callbackでは、受信したデータのうち、速度(Vx, Vy)、角速度(ω)、および指示番号を取り出し、以下のようにCANメッセージとして送信します.

  • データのパッキング:

    data_160 = struct.pack('>hhh', int(vx), int(vy), int(omega))
    

    3つの値を大きいエンディアンでそれぞれ16ビットの整数としてパッキング.
    同様に、指示番号は1バイトとしてパッキングされます.

  • CANメッセージの送信:
    送信するCANメッセージは、アービトレーションID 0x1600x161 を持ち、各々の送信結果についてログを出力することで、動作の確認が容易になっています.

4. CANメッセージの受信処理

タイマーコールバックtimer_callbackでは、CANバスからの受信をポーリングし、以下のアービトレーションIDに応じた処理を行います.

  • ID 0x150 および 0x360:
    これらのメッセージは、2バイトずつの値(x, y, θ)を含むと仮定し、struct.unpackを使ってアンパッキングした後、robot_positionトピックへパブリッシュします.

  • ID 0x151:
    ここでは、アクション番号を更新するためのメッセージとして処理され、今後の送信データに反映されます.

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?