0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【Python】ROS2 通信ノード実装

Last updated at Posted at 2024-07-30

概要

ROS2 の通信ノードについて、Python を用いた基本的な実装をまとめました。
順次追加予定です。

トピック通信

パブリッシャ

import rclpy
from rclpy.node import Node
from std_msgs.msg import データ型

class パブリッシャのクラス名(Node):
    # コンストラクタ
    def __init__(self):
        super().__init__('ノード名')
        # パブリッシャの生成
        self.pub = self.create_publisher(データ型, 'トピック名', バッファ数)
        # タイマの生成
        self.timer = self.create_timer(タイマの間隔(s), self.timer_callback)

    # コールバック
    def timer_callback(self):
        # パブリッシュ
        self.pub.publish(メッセージ)

周期的にメッセージをパブリッシュします。

サブスクライバ

import rclpy
from rclpy.node import Node
from std_msgs.msg import データ型

class サブスクライバのクラス名(Node):
    # コンストラクタ
    def __init__(self):
        super().__init__('ノード名')
        # サブスクライバの生成
        self.sub = self.create_subscription(データ型, 'トピック名',
                                            self.subscribe_callback, バッファ数)

    # コールバック
    def subscribe_callback(self, msg):
        # サブスクライブした時の処理

メッセージをサブスクライブした時、subscribe_callback() 内に実装されている処理が行われます。

サービス通信

サービス

import rclpy
from rclpy.node import Node

class サービスのクラス名(Node):
    # コンストラクタ
    def __init__(self):
        super().__init__('ノード名')
        # サービスの生成
        self.service = self.create_service(サービス型, 'サービス名', self.service_callback)

    # コールバック
    def service_callback(self, request, response):
        # サービスの処理
        
        return response

# main 関数
def main():
    rclpy.init()
    node = サービスのクラス名()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        print("Abort.")
    finally:
        rclpy.shutdown()

クライアント

import rclpy
from rclpy.node import Node

class クライアントのクラス名(Node):
    # コンストラクタ
    def __init__(self):
        super().__init__('ノード名')
        # クライアントの生成
        self.client = self.create_client(サービス型, 'サービス名')
        # サービスが利用できるまで待機
        while not self.client.wait_for_service(timeout_sec=タイムアウト秒数):
            self.get_logger().info('Waiting for service to be available...')
        # リクエストのインスタンス生成
        self.request = サービス型.Request()

    def send_request(self, val):
        # リクエストに値を代入
        self.request.リクエスト名 = val
        # サービスのリクエスト
        self.future = self.client.call_async(self.request)

def main(args=None):
    rclpy.init(args=args)
    node = クライアントのクラス名()
    val = リクエストに設定する値
    node.send_request(val)

    while rclpy.ok():
        rclpy.spin_once(node)
        # サービスの処理が終了したら
        if node.future.done():
            try:
                # サービスの結果をレスポンスに代入
                response = node.future.result()
            except Exception as e:
                node.get_logger().info(f'Service call failed: {e}')
            else:
                # 結果の表示
                node.get_logger().info(
                    f'Request: {node.request.リクエストめい} -> Response: {response.レスポンス名}')
                break
    rclpy.shutdown()
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?