環境
この記事は以下の環境で動いています。
項目 | 値 |
---|---|
CPU | Core i5-8250U |
Ubuntu | 22.04 |
ROS | Humble |
概要
ROS2には様々な機能があり、それを様々なレイヤーからアクセスすることが出来ます。
プログラムを書くときの参考に記法の一覧をまとめます。
機能の中身は個別の記事に譲ります
一覧表(追記メモ)
rclcpp | rclpy | command line | launch | other tool | |
---|---|---|---|---|---|
msg | |||||
topic (pub,sub,list) | |||||
service | |||||
action | |||||
QoS | |||||
param | |||||
tf | |||||
transform | |||||
quaternion変換 | |||||
log | |||||
diag | |||||
timer | |||||
time(now) | |||||
duration | |||||
node(name,ns,remap) | |||||
rosbag |
memo
rclcpp,rclpy,launch,
command_line,other_tool
cmake
記法一覧
msg
int32/float32等の基本型とTime、Durationの型があります。
他のパッケージのmsgを流用する場合はxxx_msgs/yyyという形で記載します
topic.msg
std_msgs/Header header
builtin_interfaces/Time stamp
builtin_interfaces/Duration stamp
bool active_flag
geometry_msgs/Point point
serviceは---で区切って、前がrequest、後ろがresponseになります。
service.srv
int8
int16
int32
int64
---
uint8
uint16
uint32
uint64
actionは---で3つに区切って、goal、result、feedbackになります。
action.action
float32
---
float64
---
string
基本のIPC
topic/publish
rclcpp_publisher
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
auto node = rclcpp::Node::make_shared("my_publisher");
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr string_pub = node->create_publisher<std_msgs::msg::String>(
"string", 1);
std_msgs::msg::String msg;
string_pub->publish(msg);
rclpy_publisher
from rclpy.node import Node
from std_msgs.msg import String
node = Node()
str_pub = node.create_publisher(String, "topic", 1)
msg = String()
str_pub.publish(msg)
command line
ros2 topic pub /string std_msgs/msg/String '{"data":"message"}'
topic/subscribe
rclcpp_subscriber
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
auto node = rclcpp::Node::make_shared("my_publisher");
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr string_sub = node->create_subscription<std_msgs::msg::String>(
"string", 1, std::bind(&Class::onMsg, this, std::placeholders::_1));
void onMsg(const std_msgs::msg::String::SharedPtr msg) {
std::cout << msg->data << std::endl;
}
callbackの登録はclassでの使用例です。
rclpy_subscriber
from rclpy.node import Node
from std_msgs.msg import String
node = Node()
str_sub = node.create_subscription(String, "topic", string_callback, 1)
def string_callback(self, msg):
print("msg", msg)
command line
ros2 topic echo /string
topic/list
rclcpp
std::map<std::string,std::vector<std::string>> topic_map = node_ptr_->get_topic_names_and_types();
// topic_name, [topic_type]のmap
command line
ros2 topic list
service/client
rclcpp_client
#include <std_srvs/srv/set_bool.hpp>
auto node = rclcpp::Node::make_shared("my_publisher");
rclcpp::Client<std_srvs::srv::SetBool>::SharedPtr bool_client = node->create_client<std_srvs::srv::SetBool>("set_bool");
if (!bool_client->service_is_ready()) {
RCLCPP_WARN(node->get_logger(), "mode service not connect");
onSetupComplete(false);
} else {
auto request = std::make_shared<std_srvs::srv::SetBool::Request>();
request->data = true;
auto future = request_mode_client_->async_send_request(request, std::bind(&MyNode::onResponse, this, std::placeholders::_1));
}
void onResponse(rclcpp::Client<std_srvs::srv::SetBool>::SharedFuture future) {
if (!future.get()->success) {
RCLCPP_WARN(get_logger(), "set mode fail");
}
}
command_line
ros2 service call /set_bool std_srvs/srv/SetBool `{"data":true}`
service/server
rclcpp_server
#include <std_srvs/srv/set_bool.hpp>
auto node = rclcpp::Node::make_shared("my_publisher");
rclcpp::Service<std_srvs::srv::SetBool>::SharedPtr bool_srv = node->create_service<std_srvs::srv::SetBool>("/set_bool", std::bind(&MyNode::onRequest, this, std::placeholders::_1, std::placeholders::_2));
void onRequest(const std::shared_ptr<std_srvs::srv::SetBool::Request> request, std::shared_ptr<std_srvs::srv::SetBool::Response> response) {
}
rclpy_server
from std_srvs.srv import SetBool
node = Node()
set_bool_srv = node.create_service(SetBool, "service_name", self.set_bool_callback)
def set_bool_callback(self, request, response):
response.success = True
action/client
action/server
parameter
Basic
rclcpp
ros_node_->declare_parameter("param_key", 1.0f);
float value = ros_node_->get_parameter("param_key").as_double();
型ごとの取得APIは以下
型 | 単体 | リスト |
---|---|---|
uint8_t | -- | as_byte_array() |
bool | as_bool() | as_bool_array() |
int64_t | as_int() | as_integer_arra() |
double | as_double() | as_double_array() |
std::string | as_string() | as_string_array() |
rclpy
self.declare_parameter('param_key', "abc")
my_param = self.get_parameter('param_key').get_parameter_value().string_value
declare_parameterで型を指定する必要がある。空リストはstringのリストと見なされるので、それ以外のリストの場合はダミーの値を入れる必要がある。
launch
Node(
package="xxx_pkg",
executable="xxx_node",
parameters=[
param_file.yaml,
{
"param_key1":value,
"param_key2":value
}
],
)
command_line(run)
ros2 run xxx_pkg xxx_node -ros-args -p <param_name>:=<value>
ros2 run xxx_pkg xxx_node -ros-args --params-file <param_file.yaml>
command_line(get)
ros2 param list
ros2 param dump <node_name>
param_file.yaml
node_handle_name1:
ros__parameters:
int_value1: 12
structure_valie:
string_value: "abcd"
list_value: [1,2,3]
node_handle_name2:
ros__parameters:
int_value1: 12
ノード毎に指定します(ROS2は1プロセス中に複数のノードを起動できる)
param_file.yaml
/**:
ros__parameters:
int_value1: 12
上記のようにワイルドカードも指定できます。
callback
自分のパラメーターの変更を監視(rclpy)
def parameters_callback(self, params):
for param in params:
print(param.name, param.value)
return SetParametersResult(successful=True)
Node.add_on_set_parameters_callback(self.parameters_callback)
時間系
Timer
rclcpp
auto node = rclcpp::Node::make_shared("my_timer");
rclcpp::TimerBase::SharedPtr interval_timer = node->create_wall_timer(100ms, std::bind(&MyClass::onIntervalTimer, this));
rclpy
node = Node()
interval_timer = node.create_timer(0.5, timer_callback)
def timer_callback():
...
Time/Duration
rclcpp
rclcpp::Time ros_now = node->now();
rclcpp::Duration duration = rclcpp::Duration::from_seconds(0.1);
std_msgs::String msg;
msg.header.stamp = (ros_now + duration);
double time_double = ros_now.nanoseconds() * 1e-9;
double duration_double = duration.seconds();
rclpy
from rclpy import time
ros_now = node.get_clock().now()
duration = rclpy.duration.Duration(seconds=1.0, nanoseconds=1000000000)
msg = std_msgs.msg.String()
msg.header.stamp = (ros_now + duration).to_msg()
time = time.Time.from_msg(msg.header.stamp)
log
rclcpp
auto node = std::make_shared<Node>("node")
RCLCPP_INFO(node->get_logger(), "message");
RCLCPP_INFO_THROTTLE(node->get_logger(), *(node->get_clock()), 1000, "message every 1000ms");
RCLCPP_INFO_SKIPFIRST_THROTTLE(node->get_logger(), *(node->get_clock()), 5000, "message every 1000ms skip first");
rclcpp::Logger logger = rclcpp::get_logger("logger");
RCLCPP_INFO(logger, "message");
rclpy
node.get_logger().info('message')
node.get_logger().error('message every 1s', throttle_duration_sec=1)
node.get_logger().error('message every 1s', throttle_duration_sec=1, skip_first=True)
座標系
broadcast
rclcpp
#include <tf2_ros/transform_broadcaster.h>
tf_broadcaster = std::make_unique<tf2_ros::TransformBroadcaster>(node);
geometry_msgs::msg::TransformStamped transformStamped;
transformStamped.header = odom.header;
transformStamped.child_frame_id = odom.child_frame_id;
transformStamped.transform.translation.x = odom.pose.pose.position.x;
transformStamped.transform.translation.y = odom.pose.pose.position.y;
transformStamped.transform.translation.z = odom.pose.pose.position.z;
transformStamped.transform.rotation = odom.pose.pose.orientation;
tf_broadcaster->sendTransform(transformStamped);
command line
ros2 run tf2_ros static_transform_publisher --frame-id map --child-frame-id odom
listen
listener
from_frame_rel = "odom"
to_frame_rel = "base_link"
try:
latest_ros_time = self.tf_buffer.get_latest_common_time(from_frame_rel, to_frame_rel)
t = self.tf_buffer.lookup_transform(
from_frame_rel,
to_frame_rel,
latest_ros_time)
except TransformException as ex:
self.get_logger().info(f'Could not transform {from_frame_rel} to {to_frame_rel}: {ex}')
return
self.get_logger().info(f'transform {t}')
rclcpp
#include <tf2_ros/transform_listener.h>
std::unique_ptr<tf2_ros::Buffer> tf_buffer = std::make_unique<tf2_ros::Buffer>(node->get_clock());
std::shared_ptr<tf2_ros::TransformListener> tf_listener = std::make_shared<tf2_ros::TransformListener>(*tf_buffer);
try {
std::string error_str;
if (tf_buffer_->canTransform(source_frame_id, target_frame_id, tf2::TimePointZero, &error_str)) {
return tf_buffer_->lookupTransform(source_frame_id, target_frame_id, tf2::TimePointZero);
}
} catch (const tf2::TransformException & ex) {
}
command line
ros2 run tf2_ros tf2_echo map base_link
transform
quaternion
実行系
ノード実行
command_lineの基本
ros2 run pkg_name node_name
command_lineパラメーター付き
ros2 run pkg_name node_name --ros-args -r topic:=new_topic
- トピック名リネーム
-r original_topic:=new_topic
- ノード名リネーム
-r __node:=new_node_name
- ネームスペース名リネーム
-r __ns:=new_name_space_name
- パラメーター設定
-p param_name:=param_value
- パラメーターファイルの適用
--params-file params_file.yaml
複数設定する時は--ros-args -r a:=aa -r b:=bb
と記載します。
launch_argument
launch_argument宣言&使用
def generate_launch_description():
return LaunchDescription(
[
DeclareLaunchArgument('test_arg', default_value='true'),
Node(
package="xxx_pkgs",
executable="xxx_node",
condition=IfCondition(LaunchConfiguration('test_arg')),
),
]
)
launch_argumentを呼び出し
def generate_launch_description():
return LaunchDescription(
[
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
get_package_share_directory("xxx_pkgs"),
"/launch/xxx.launch.py",
]
),
launch_arguments={"test_arg": "false"}.items(),
),
]
)
command_line
ros2 launch xxx_pkgs xxx.launch.py test_arg:=false