概要
ROS2 の通信ノードについて、Python を用いた基本的な実装をまとめました。
順次追加予定です。
トピック通信
パブリッシャ
import rclpy
from rclpy.node import Node
from std_msgs.msg import データ型
class パブリッシャのクラス名(Node):
# コンストラクタ
def __init__(self):
super().__init__('ノード名')
# パブリッシャの生成
self.pub = self.create_publisher(データ型, 'トピック名', バッファ数)
# タイマの生成
self.timer = self.create_timer(タイマの間隔(s), self.timer_callback)
# コールバック
def timer_callback(self):
# パブリッシュ
self.pub.publish(メッセージ)
周期的にメッセージをパブリッシュします。
サブスクライバ
import rclpy
from rclpy.node import Node
from std_msgs.msg import データ型
class サブスクライバのクラス名(Node):
# コンストラクタ
def __init__(self):
super().__init__('ノード名')
# サブスクライバの生成
self.sub = self.create_subscription(データ型, 'トピック名',
self.subscribe_callback, バッファ数)
# コールバック
def subscribe_callback(self, msg):
# サブスクライブした時の処理
メッセージをサブスクライブした時、subscribe_callback() 内に実装されている処理が行われます。
サービス通信
サービス
import rclpy
from rclpy.node import Node
class サービスのクラス名(Node):
# コンストラクタ
def __init__(self):
super().__init__('ノード名')
# サービスの生成
self.service = self.create_service(サービス型, 'サービス名', self.service_callback)
# コールバック
def service_callback(self, request, response):
# サービスの処理
return response
# main 関数
def main():
rclpy.init()
node = サービスのクラス名()
try:
rclpy.spin(node)
except KeyboardInterrupt:
print("Abort.")
finally:
rclpy.shutdown()
クライアント
import rclpy
from rclpy.node import Node
class クライアントのクラス名(Node):
# コンストラクタ
def __init__(self):
super().__init__('ノード名')
# クライアントの生成
self.client = self.create_client(サービス型, 'サービス名')
# サービスが利用できるまで待機
while not self.client.wait_for_service(timeout_sec=タイムアウト秒数):
self.get_logger().info('Waiting for service to be available...')
# リクエストのインスタンス生成
self.request = サービス型.Request()
def send_request(self, val):
# リクエストに値を代入
self.request.リクエスト名 = val
# サービスのリクエスト
self.future = self.client.call_async(self.request)
def main(args=None):
rclpy.init(args=args)
node = クライアントのクラス名()
val = リクエストに設定する値
node.send_request(val)
while rclpy.ok():
rclpy.spin_once(node)
# サービスの処理が終了したら
if node.future.done():
try:
# サービスの結果をレスポンスに代入
response = node.future.result()
except Exception as e:
node.get_logger().info(f'Service call failed: {e}')
else:
# 結果の表示
node.get_logger().info(
f'Request: {node.request.リクエストめい} -> Response: {response.レスポンス名}')
break
rclpy.shutdown()