この記事は coins Advent Calendar 2024 14日目、および esys Advent Calendar 2024 13日目の記事です。
今日が16日なので、どちらにも全然間に合ってないですね...すみません...
(ハードウェアが絡むと色々大変なんです... )
自己紹介
まずは軽く自己紹介を。筑波大B1のKengokumaと申します。
esysの人間なのですが、coinsアドカレにも偽coinsとして参戦しています。
これまではロボカップジュニアという自律型ロボットの競技をやっており、ハードウェアから回路、ソフトウェアまで一通り開発していました。
大学に入ってからもサークルでロボコンに出ており、今年9月にはキャチロボ1に参加したりもしました。その開発の中でROSを使うことが多く、今回はその紹介と、簡単な実装例として過去のロボカップジュニア用のロボットをROSで動かしてみた話をしようと思います。
注意
この記事はROSを触り始めてまだ半年の初心者が書いています。
間違っているところがあるかもしれません。
ROS とは
ROS (Robot Operating System) はロボット開発のためのソフトウェアプラットフォームであり、分散システムを実現する上で必須となるものです。ROSにはROS1とROS2の2種類ありますが、今回はROS2についての記事です。
ROSには自律移動や経路計画のための豊富なライブラリやツールが用意されていて、様々なハードウェアで動作する再利用性の高いソフトウェアを開発することができます。
主に使用される言語はC++とPythonです。
ROSの仕組み
ROSのプログラムの最小単位はノードと呼ばれます。ROSシステムには基本的にノードが複数存在し、相互にデータ通信を行うことで複雑な処理を行うことができます。
ROSのノード間はトピックというメッセージを配信/購読することで通信します。このトピックには変数などと同様に型があります。
また、トピックを配信する側をパブリッシャー 、受信する側をサブスクライバーと呼びます。
ROSの動作環境
ROS2はWindowsやMacでも動作するそうですが、基本的にはLinuxで開発するのが良いと思います。筆者は普段、Ubuntu22.04のROS2 Humbleを用いて開発しています。
ROS2のインストールや環境構築方法は、ググればいくらでも出てくるので今回は割愛します。
ROSでロボットを動かそう
ROSの紹介はほどほどにして、早速ロボットを動かしていきたいと思います。
使用するロボットは筆者が今年1月から3月にかけて、ロボカップジュニアのレスキューメイズ競技用に製作したものです。
今回やることは
- PCからロボットを遠隔操作して走らせる
-
ロボットの軌跡を可視化する
の2つです。
ロボットを遠隔操作して走らせる
そもそもの話としてROSはOS上で動作するソフトウェアシステムですが、このロボットはTeensy4.1というマイコンだけで動いているため、OSが走るようなコンピュータは搭載されていません。
また、遠隔操作するというからには無線通信の機能が必要ですが、Teensy4.1は標準で無線通信ができません。
そこで、Wi-FiやBluetoothが使用可能なESP32というマイコンを新たに取り付け、ESP32上でmicro-ROSという組み込み用マイコンで動作するROSを走らせることでこの問題を解決しました。
ということでESP32を取り付けました。こんな感じ↑
Teensy4.1をmicro-ROSが動いているESP32とシリアル通信させ、micro-ROSとPC上のROS間でWi-Fi経由でトピックを用いて通信させようという訳ですね。ブロック図にするとこんな感じです↓
ESP32のmicro-ROSのプログラム
#include <micro_ros_arduino.h>
#include <stdio.h>
#include <rcl/rcl.h>
#include <rcl/error_handling.h>
#include <rclc/rclc.h>
#include <rclc/executor.h>
#include <std_msgs/msg/int32_multi_array.h>
rcl_subscription_t subscriber;
std_msgs__msg__Int32MultiArray sub_msg;
rcl_publisher_t publisher;
std_msgs__msg__Int32MultiArray pub_msg;
rclc_executor_t executor;
rclc_support_t support;
rcl_allocator_t allocator;
rcl_node_t node;
rcl_timer_t timer;
#define LED_PIN 12
#define RCCHECK(fn) \
{ \
rcl_ret_t temp_rc = fn; \
if ((temp_rc != RCL_RET_OK)) { error_loop(); } \
}
#define RCSOFTCHECK(fn) \
{ \
rcl_ret_t temp_rc = fn; \
if ((temp_rc != RCL_RET_OK)) {} \
}
void error_loop() {
while (1) {
}
}
void subscription_callback(const void *msgin) {
const std_msgs__msg__Int32MultiArray *sub_msg = (const std_msgs__msg__Int32MultiArray *)msgin;
int linear = (int)sub_msg->data.data[0];
int angular = (int)sub_msg->data.data[1];
Serial2.println(linear);
Serial2.println(angular);
Serial.println(linear);
Serial.println(angular);
}
void setup() {
Serial2.begin(9600);
Serial.begin(9600);
sub_msg.data.capacity = 100;
sub_msg.data.size = 2;
sub_msg.data.data = (int32_t *)malloc(sub_msg.data.capacity * sizeof(int32_t));
sub_msg.layout.dim.capacity = 100;
sub_msg.layout.dim.size = 0;
sub_msg.layout.dim.data = (std_msgs__msg__MultiArrayDimension *)malloc(sub_msg.layout.dim.capacity * sizeof(std_msgs__msg__MultiArrayDimension));
for (size_t i = 0; i < sub_msg.layout.dim.capacity; i++) {
sub_msg.layout.dim.data[i].label.capacity = 20;
sub_msg.layout.dim.data[i].label.size = 0;
sub_msg.layout.dim.data[i].label.data = (char *)malloc(sub_msg.layout.dim.data[i].label.capacity * sizeof(char));
}
pub_msg.data.capacity = 100;
pub_msg.data.size = 2;
pub_msg.data.data = (int32_t *)malloc(pub_msg.data.capacity * sizeof(int32_t));
pub_msg.layout.dim.capacity = 100;
pub_msg.layout.dim.size = 0;
pub_msg.layout.dim.data = (std_msgs__msg__MultiArrayDimension *)malloc(pub_msg.layout.dim.capacity * sizeof(std_msgs__msg__MultiArrayDimension));
for (size_t i = 0; i < pub_msg.layout.dim.capacity; i++) {
pub_msg.layout.dim.data[i].label.capacity = 20;
pub_msg.layout.dim.data[i].label.size = 0;
pub_msg.layout.dim.data[i].label.data = (char *)malloc(pub_msg.layout.dim.data[i].label.capacity * sizeof(char));
}
set_microros_wifi_transports("<ssid>", "<password>", "<ip-address>", 8888);
pinMode(LED_PIN, OUTPUT);
digitalWrite(LED_PIN, HIGH);
delay(2000);
allocator = rcl_get_default_allocator();
//create init_options
RCCHECK(rclc_support_init(&support, 0, NULL, &allocator));
// create node
RCCHECK(rclc_node_init_default(&node, "micro_ros_arduino_node", "", &support));
// create subscriber
RCCHECK(rclc_subscription_init_default(
&subscriber,
&node,
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32MultiArray),
"/motor_control"));
RCCHECK(rclc_publisher_init_best_effort(
&publisher,
&node,
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Int32MultiArray),
"/encoder_feedback"));
// create executor
RCCHECK(rclc_executor_init(&executor, &support.context, 1, &allocator));
RCCHECK(rclc_executor_add_subscription(&executor, &subscriber, &sub_msg, &subscription_callback, ON_NEW_DATA));
}
void loop() {
if (Serial2.available() > 0) {
int32_t data[2];
for (int i = 0; i < 2; i++) {
while(Serial2.available()==0){}
String received = Serial2.readStringUntil('\n');
data[i] = received.toInt();
pub_msg.data.data[i] = data[i];
Serial.println(data[i]);
}
RCSOFTCHECK(rcl_publish(&publisher, &pub_msg, NULL));
}
delay(10);
RCCHECK(rclc_executor_spin_some(&executor, RCL_MS_TO_NS(100)));
}
以下の部分で/motor_control
トピックを受け取った時の処理が記述されています。ROSから受け取ったモータの速度指令値をそのままTeensy4.1に送っているだけです。
void subscription_callback(const void *msgin) {
const std_msgs__msg__Int32MultiArray *sub_msg = (const std_msgs__msg__Int32MultiArray *)msgin;
int linear = (int)sub_msg->data.data[0];
int angular = (int)sub_msg->data.data[1];
Serial2.println(linear);
Serial2.println(angular);
Serial.println(linear);
Serial.println(angular);
}
Ubuntu PCのROSのプログラム
ROS側ではcontrol_node
というノードを立てています。control_node
はキーボードからの入力を配信するteleop_twist_keyboard
ノードから/cmd_vel
トピックを受け取り、それに応じて速度指令値を/motor_control
トピックに配信するという仕様になっています。
#include "rclcpp/rclcpp.hpp"
#include "geometry_msgs/msg/twist.hpp"
#include "std_msgs/msg/int32_multi_array.hpp"
using std::placeholders::_1;
class Control : public rclcpp::Node {
public:
Control() : Node("control_node") {
motor_publisher_ = this->create_publisher<std_msgs::msg::Int32MultiArray>(
"/motor_control", 10);
motor_subscription_ = this->create_subscription<geometry_msgs::msg::Twist>(
"/cmd_vel", 10, std::bind(&Control::motor_callback, this, _1));
}
private:
rclcpp::Publisher<std_msgs::msg::Int32MultiArray>::SharedPtr motor_publisher_;
rclcpp::Subscription<geometry_msgs::msg::Twist>::SharedPtr motor_subscription_;
void motor_callback(const geometry_msgs::msg::Twist::SharedPtr msg) {
std_msgs::msg::Int32MultiArray array;
array.data.resize(2);
int linear_power = (int)(msg->linear.x * 20);
if (linear_power > 200) {
array.data[0] = 200;
} else if (linear_power < -200) {
array.data[0] = -200;
} else {
array.data[0] = linear_power;
}
int angular_power= (int)(msg->angular.z * 10);
if (angular_power > 150) {
array.data[1] = 150;
} else if (angular_power < -150) {
array.data[1] = -150;
} else {
array.data[1] = angular_power;
}
RCLCPP_INFO(this->get_logger(), "%d %d", array.data[0], array.data[1]);
motor_publisher_->publish(array);
}
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<Control>());
rclcpp::shutdown();
return 0;
}
ロボットの軌跡を可視化する
とりあえず遠隔操作のコードはできたので、今度はロボットの走行軌跡を可視化していきたいと思います。
ROSにはRvizと呼ばれる可視化ツールがあり、トピックを配信することでロボットの姿勢や点群を可視化することができます。
ロボットの軌跡を描画するには、ロボットがどの方向にどれだけ移動したか、つまりタイヤがどれだけ回転したかを知る必要がありますね。そこでエンコーダから回転数を取得して移動量を計算します。これをオドメトリと言います。
ROSはC++に加えてPythonでも書けるため、せっかくなのでオドメトリ用のプログラムはPythonで実装しました。
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Quaternion
from tf_transformations import quaternion_from_euler
from std_msgs.msg import Int32MultiArray
from tf2_ros import TransformBroadcaster
from geometry_msgs.msg import TransformStamped
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import math
class OdometryNode(Node):
def __init__(self):
super().__init__('odometry_node')
# QoSプロファイルを設定
qos_profile = QoSProfile(
history=HistoryPolicy.KEEP_LAST, # 最新のメッセージを保持
depth=10, # 保存するメッセージ数
reliability=ReliabilityPolicy.BEST_EFFORT # ベストエフォート
)
self.declare_parameter('wheel_base', 0.15) # 車輪間距離 (m)
self.declare_parameter('wheel_radius', 0.08) # 車輪半径 (m)
self.wheel_base = self.get_parameter('wheel_base').value
self.wheel_radius = self.get_parameter('wheel_radius').value
self.x = 0.0
self.y = 0.0
self.theta = 0.0
self.left_wheel_ticks = 0.0
self.right_wheel_ticks = 0.0
self.last_left_wheel_ticks = 0.0
self.last_right_wheel_ticks = 0.0
self.last_time = self.get_clock().now()
self.odom_publisher = self.create_publisher(Odometry, '/odom', 10)
self.create_subscription(Int32MultiArray, '/encoder_feedback', self.encoder_callback, qos_profile)
self.tf_broadcaster = TransformBroadcaster(self)
def encoder_callback(self, msg):
self.left_wheel_ticks = msg.data[0]
self.right_wheel_ticks = msg.data[1]
self.publish_odometry()
def compute_odometry(self, delta_time):
# エンコーダから距離変化を計算
delta_left = ((self.left_wheel_ticks - self.last_left_wheel_ticks) / 1000) * self.wheel_radius
delta_right = ((self.right_wheel_ticks - self.last_right_wheel_ticks) / 1000) * self.wheel_radius
self.get_logger().info(f'delta_left: {delta_left} ')
self.get_logger().info(f'delta_right: {delta_right} ')
# 差動2輪のオドメトリ計算
delta_s = (delta_left + delta_right) / 2.0
delta_theta = (delta_right - delta_left) / self.wheel_base
self.get_logger().info(f'delta_s: {delta_s} ')
self.get_logger().info(f'delta_theta: {delta_theta} ')
self.theta += delta_theta
self.x += delta_s * math.cos(self.theta)
self.y += delta_s * math.sin(self.theta)
self.last_left_wheel_ticks = self.left_wheel_ticks
self.last_right_wheel_ticks = self.right_wheel_ticks
def publish_odometry(self):
current_time = self.get_clock().now()
delta_time = (current_time - self.last_time).nanoseconds / 1e9
self.last_time = current_time
self.compute_odometry(delta_time)
# オドメトリメッセージを作成
odom = Odometry()
odom.header.stamp = self.get_clock().now().to_msg()
odom.header.frame_id = "odom"
odom.child_frame_id = "base_link"
# 座標と向き
odom.pose.pose.position.x = self.x
odom.pose.pose.position.y = self.y
odom.pose.pose.position.z = 0.0
# クォータニオン計算と設定
q = quaternion_from_euler(0, 0, self.theta)
odom.pose.pose.orientation.x = q[0]
odom.pose.pose.orientation.y = q[1]
odom.pose.pose.orientation.z = q[2]
odom.pose.pose.orientation.w = q[3]
self.odom_publisher.publish(odom)
# TF情報をブロードキャスト
transform = TransformStamped()
transform.header.stamp = self.get_clock().now().to_msg()
transform.header.frame_id = 'odom'
transform.child_frame_id = 'base_link'
transform.transform.translation.x = self.x
transform.transform.translation.y = self.y
transform.transform.translation.z = 0.0
transform.transform.rotation.x = q[0]
transform.transform.rotation.y = q[1]
transform.transform.rotation.z = q[2]
transform.transform.rotation.w = q[3]
self.tf_broadcaster.sendTransform(transform)
def main(args=None):
rclpy.init(args=args)
node = OdometryNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
ESP32のmicro-ROSから/encoder_feedback
トピックに配信されるタイヤの回転数情報を受け取って、移動量の計算を行っています。
ここまでのシステムの流れをまとめると以下の図のようになります。(実はこの図の出力もROSの機能です)
実際に動かしてみよう
プログラムの説明ばかりしていても面白くないのでロボットを動かしていきましょう!
深夜に何をやっているんですかね... pic.twitter.com/i8o9EgvHBn
— Kengokuma (@kengokuma_ITF24) December 15, 2024
この間に動いた軌跡をRvizで可視化するとこんな感じ↓
何故かはじめに向いてる方向がおかしいですが、その後の旋回や細かな方向の変化もある程度追従できてるようです。まあ及第点でしょう。
最後に
ここまで読んでくださりありがとうございました。
何となくROSが便利なものだと思っていただけたら幸いです。
では、また。
-
毎年京都で開催されているワークハンドリングを競うロボットコンテスト。 ↩