#プログラミング ROS< センサとアクチュエータの導入 >
##はじめに
1つの参考書に沿って,ROS(Robot Operating System)を難なく扱えるようになることが目的である.その第27弾として,「センサとアクチュエータの導入」を扱う.
##環境
#####仮想環境
ソフト | VMware Workstation 15 |
実装RAM | 2 GB |
OS | Ubuntu 64 ビット |
isoファイル | ubuntu-mate-20.04.1-desktop-amd64.iso |
#####コンピュータ
デバイス | MSI |
プロセッサ | Intel(R) Core(TM) i5-7300HQ CPU @ 2.50GHz 2.50GHz |
実装RAM | 8.00 GB (7.89 GB 使用可能) |
OS | Windows (Windows 10 Home, バージョン:20H2) |
#####ROS
Distribution | noetic |
プログラミング言語 | Python 3.8.5 |
##センサとアクチュエータの導入
ROSはよく知られたセンサやアクチュエータを幅広くカバーしているが,すべてをカバーしているわけではない.新しいハードウェアが出てくれば,自分でそれをROSに取り組む必要がある.そうすることで,コミュニティ全体でそれを使えるようになる.
☆ROSのエコシステムに新しいセンサやアクチュエータを統合する方法を学ぶ
<作業>
デバイスにアクセスするのに使っているAPIを包むROSのラッパーを書くことなどが主な作業
##センサ
###概要
#####制約
・センサはすでに測定値を取得するためのPythonのAPIを持っている(API:Application Programming Interface)
・配線はすべて正しくされており,APIを使うことでセンサから値を正しく読み取ることができている
この制約に従っている限り,センサは機能していることになるので,うまく動作しないことは
すべてROSのラッパーの問題となり,デバッグが楽になる.
#####擬似センサ
単純なAPIを持つ擬似センサ(FakeSensorと呼ぶ)を使っていく
Pythonのクラスは実際のセンサをシミュレートし,これを用いることで,ハードウェアを購入しなくても,センサをROSに組み込む方法を学ぶことができる(PySideというGUIライブラリ使用)
#####ラッパーを設計
設計仕様の決定項目
・センサの測定値をトピック上で送るべきか,それともサービスやアクションを用いて要求されたときにのみ測定値を与えるようにするか
・センサからのデータを取得する方法.(センサによっては1つしかないものもある)
・作成したラッパーが送信するROSメッセージ型を決める.汎用性も考慮するとよい.
###実装
エラーによりうまく動かなかった部分もあるが,それに関しても後述する.
#####ソースコード:トピックに測定値を定期的に送る
#! /usr/bin/env python3
"""
topicを通して定期的に測定値を送る
"""
from math import pi
from fake_sensor import FakeSensor #作成されたfake_sensor.pyのクラスFakeSensorをインポート
import rospy
import tf
from geometry_msgs.msg import Quaternion
def make_quaternion(angle):
"""ヨー角(ラジアン)からQuaternionに変換する"""
q = tf.transformations.quaternion_from_euler(0, 0, angle) #オイラー角から四元数に変換
return Quaternion(*q) #四元数の形で出力
if __name__ == '__main__':
fake_sensor = FakeSensor() #FakeSensorクラスのインスタンスを作成
rospy.init_node('fake_sensor')
pub = rospy.Publisher('angle', Quaternion, queue_size = 10)
rate = rospy.Rate(10.0) #配信周期の設定 (10Hz)
while not rospy.is_shutdown(): #ノードが終了するまでループ
angle = fake_sensor.sensor.value() * 2 * pi / 100.0
#value()はActualFakeSensorクラスにあるが,FakeSensorクラス中ではself.sensorに
#ActualFakeSensorクラスのインスタンスを作成しているため,アクセスするときには,
#FakeSensor().sensor.value()とすればいい
q = make_quaternion(angle)
pub.publish(q)
rate.sleep()
#####ソースコード:ストリーミングされた計測値をトピックに送る
#! /usr/bin/env python3
"""
計測値を固定周期で配信する
"""
from math import pi
from fake_sensor import FakeSensor #作成されたfake_sensor.pyのクラスFakeSensorをインポート
import rospy
import tf
from geometry_msgs.msg import Quaternion
from time import sleep
def make_quaternion(angle):
"""ヨー角(ラジアン)からQuaternionに変換する"""
q = tf.transformations.quaternion_from_euler(0, 0, angle) #オイラー角から四元数に変換
return Quaternion(*q) #四元数の形で出力
def publish_value(value):
angle = value* 2 * pi / 100.0
q = make_quaternion(angle)
pub.publish(q)
if __name__ == '__main__':
fake_sensor = FakeSensor() #FakeSensorクラスのインスタンスを作成
sleep(2) #少し待たないと,FakeSensorの中で別のクラスを呼び出すのが間に合わなくなりエラーが出てしまう
sensor = fake_sensor.sensor
sensor.register_callback(publish_value)
rospy.init_node('fake_sensor')
pub = rospy.Publisher('angle', Quaternion, queue_size = 10)
#####出力
Segmentation fault (core dumped)
このようなエラーがでた.ネット上にあるたくさんの情報をもとに解決を図ったが,それでもうまく解決することができなかった.(解決したときには,修正することとする.)
#####ソースコード:ストリーミングされた計測値を固定周期で配信する
#! /usr/bin/env python3
"""
計測値を固定周期で配信する
"""
from math import pi
from threading import Lock #Lockは同時実行を防ぐことができる
from fake_sensor import FakeSensor #作成されたfake_sensor.pyのクラスFakeSensorをインポート
import rospy
import tf
from geometry_msgs.msg import Quaternion
from time import sleep
def make_quaternion(angle):
"""ヨー角(ラジアン)からQuaternionに変換する"""
q = tf.transformations.quaternion_from_euler(0, 0, angle) #オイラー角から四元数に変換
return Quaternion(*q) #四元数の形で出力
def save_value(value):
"""現在の角度の値(センサの測定値)を格納するだけ"""
global angle
with lock: #angleのロックを取得
angle = value* 2 * pi / 100.0
if __name__ == '__main__':
lock = Lock() #angleのロックを作成し,同時アクセスを防ぐ
fake_sensor = FakeSensor() #FakeSensorクラスのインスタンスを作成
sleep(2) #少し待たないと,FakeSensorの中で別のクラスを呼び出すのが間に合わなくなりエラーが出てしまう
sensor = fake_sensor.sensor
sensor.register_callback(save_value)
rospy.init_node('fake_sensor')
pub = rospy.Publisher('angle', Quaternion, queue_size = 10)
global angle
angle = None
rate = rospy.Rate(10.0) #配信周期の設定 (10Hz)
while not rospy.is_shutdown():
with lock: #angleのロックを取得
if angle: #コールバックがすでに値をangleに代入していたとすると,True,実行されていないとFalse
q = make_quaternion(angle)
pub.publish(q)
rate.sleep()
#####出力
値を変えるまでは動いていないことが分かる.
ターミナルを終了後には先ほど同様のエラーが現れる.(Segmentation fault (core dumped)
)
#####ソースコード:センサの測定値を要求に応じて送る
ここでは,サービスの定義をして,それをプログラムで用いた.
---
geometry_msgs/Quaternion quaternion
入力をstd_msgs/Empty
と明示的に定義することもできるようだが,catkin_make
実行時にエラーが現れたため,省略することで,入力のないサービスコールを定義している.
#! /usr/bin/env python3
"""
センサの計測値を要求に応じて送る
self.
センサから測定値を高速に得られる場合 → サービスを使用
そうでない場合 → アクションコールを使用
"""
from math import pi
from fake_sensor import FakeSensor as FakeSensorUI #サービスの名前とややこしいから改名
import rospy
import tf
from geometry_msgs.msg import Quaternion
from sensor_actuator.srv import FakeSensor, FakeSensorResponse
from time import sleep
def make_quaternion(angle):
"""ヨー角(ラジアン)からQuaternionに変換する"""
q = tf.transformations.quaternion_from_euler(0, 0, angle) #オイラー角から四元数に変換
return Quaternion(*q) #四元数の形で出力
def callback(request):
angle = sensor.value()* 2 * pi / 100.0
q = make_quaternion(angle)
return FakeSensorResponse(q)
if __name__ == '__main__':
sensor = FakeSensorUI() #FakeSensorクラスのインスタンスを作成
#sleep(2) #少し待たないと,FakeSensorの中で別のクラスを呼び出すのが間に合わなくなりエラーが出てしまう
rospy.init_node('fake_sensor')
service = rospy.Service('angle', FakeSensor, callback)
#####出力
Segmentation fault (core dumped)
ここでもまた同様のエラーが出てしまった.
#####ソースコード:センサの定義
参考文献において,作成されていたデモ用センサのソースコードを示す.一部バージョンの違いなどによるエラーが見受けられたため,改編している箇所がある.それについてはソースコードを示した後,記述する.
#!/usr/bin/env python3
import sys
import threading
# Import the core and GUI elements of Qt
from PySide6.QtCore import *
from PySide6.QtGui import *
from PySide6.QtWidgets import * #QApplicationを含むライブラリ
sensor = None
class ActualFakeSensor:
def __init__(self, callback=None):
self.callback = callback
self.app = QApplication(sys.argv)
self.layout = QVBoxLayout()
self.dial = QDial()
self.dial.valueChanged.connect(self.dial_callback)
self.dial.setNotchesVisible(True)
self.dial.setWrapping(True)
self.layout.addWidget(self.dial)
self.quit = QPushButton('Quit')
self.quit.clicked.connect(self.app.quit)
self.layout.addWidget(self.quit)
self.group = QGroupBox('Fake Sensor')
self.group.setLayout(self.layout)
def dial_callback(self, value):
if self.callback:
self.callback(self.value())
def register_callback(self, callback):
self.callback = callback
def value(self):
return self.dial.value()
def set_value(self, value):
self.dial.setValue(value)
def run(self):
self.group.show()
self.app.exec_()
class FakeSensor:
def __init__(self):
thread = threading.Thread(target=self.run)
thread.daemon = True
thread.start()
def run(self):
self.sensor = ActualFakeSensor()
self.sensor.run()
if __name__ == '__main__':
sensor = FakeSensor()
from time import sleep
while True:
print('x')
sleep(1)
改編箇所は3か所ある.1つ目は他のファイルも同様だが,Python3を使っているため,#!/usr/bin/env python3
としている.2つ目と3つ目はプログラムに使うライブラリのインポート部分である.PySide
にはいくつかバージョンがあり,そのバージョンに応じてサポートしているPythonのバージョンも変わる.はじめPySide
となっていたが,これはPython3.5あたりまでのサポートのようで,それ以降はhttps://pypi.org/project/PySide6/ などを参考に必要なバージョンをインストールする必要がある.ここでは,Python3.85に適したPySide6
を取り入れた.3つ目は,from PySide6.QtWidgets import *
の追記である.どうやらPySide6
では,QApplication
という関数はQtWidgets
の中にあるようだ.(https://medium.com/weekly-python/getting-started-writing-qt-6-applications-in-python-with-pyside6-389ee4c384ee)
##アクチュエータ
センサと変わらない.APIを熟知したうえでラッパーを設計する.
ここでは,1つのプログラムの中に,3つのアクチュエーションを実装している.それらについて1つ1つ特徴を見てから実装を示す.
###特徴
#####連続的なアクチュエーション(トピック)
次の要求が来る前に,今の要求を完了可能である(アクチュエータによるバッファリングはされない)必要がある.そうでない場合には,ほかの設計のほうが適切.
#####不定期で瞬間的なアクチュエーション(サービス)
トピックを介して送られたメッセージが受信されなかったり,実行されない可能性が常にある(確率は低い).ROSトピックは,送信を保証するTCPソケット上に作られているが,購読する側のバッファーがオーバーフローした場合には,メッセージが失われるかもしれない.たまにしかコマンドを送信しない場合には,コマンドのうちの1つが失われてしまうのは大きな問題で,コマンドがすべて伝わったかどうか確認したくなる.この場合には,サービスコールを使用すべき.これにより,アクチュエータにコマンドを配信し,確認応答を取得するまで待機することができる.しかし,コマンドの実行に時間がかかったり,終了するまで待ちたくない場合には,問題となる可能性がある.⇒アクションインタフェースを使用.
#####不定期で時間のかかるアクチュエーション(アクション)
アクションインタフェースはサービスインタフェースに似ているが,非同期であるから,呼び出し側のノードがコマンドの終了を待つ必要がない.ナビゲーションスタックがそのよい例である.ロボットが目標位置に移動するまで待っていたくはないが,ロボットがいつそこに着いたかは知りたい.
###実装
ここでは,srvファイルとactionファイル1つずつ定義したものを使用している.
bool on
---
bool status
float32 orientation
---
float32 final_orientation
---
float32 current_orientation
#####ソースコード
#!/usr/bin/env python3
from fake_actuator import FakeActuator
import rospy
import actionlib
from std_msgs.msg import Float32
# BEGIN IMPORT
from sensor_actuator.srv import Light,LightResponse
from sensor_actuator.msg import RotationAction,RotationFeedback,RotationResult
# END IMPORT
##トピック:連続なアクチュエーション
# BEGIN TOPIC_CALLBACK
def volume_callback(msg):
actuator.set_volume(min(100, max(0, int(msg.data * 100))))
# END TOPIC_CALLBACK
##サービス:不定期で瞬間的なアクチュエーション
# BEGIN SERVICE_CALLBACK
def light_callback(request):
actuator.toggle_light(request.on)
return LightResponse(actuator.light_on())
# END SERVICE_CALLBACK
##アクション:不定期で時間のかかるアクチュエーション
# BEGIN ACTION_CALLBACK
def rotation_callback(goal):
feedback = RotationFeedback()
result = RotationResult()
actuator.set_position(goal.orientation)
success = True
rate = rospy.Rate(10)
while fabs(goal.orientation - actuator.position()) > 0.01:
if a.is_preempt_requested():
success = False
break;
feedback.current_orientation = actuator.position()
a.publish_feedback(feedback)
rate.sleep()
result.final_orientation = actuator.position()
if success:
a.set_succeeded(result)
else:
a.set_preempted(result)
# END ACTION_CALLBACK
if __name__ == '__main__':
actuator = FakeActuator() #FakeActuatorのクラスインスタンスを作成
# Initialize the node
rospy.init_node('fake')
# Topic for the volume
t = rospy.Subscriber('fake/volume', Float32, volume_callback) #音量コマンドのためのトピックを購読
# Service for the light
s = rospy.Service('fake/light', Light, light_callback) #ライトを制御するサービスを公開
# Action for the position
a = actionlib.SimpleActionServer('fake/position', RotationAction, #回転位置を制御するアクションを公開
execute_cb=rotation_callback,
auto_start=False)
a.start()
# Start everything
rospy.spin()
# END ALL
トピックを購読できている
サービスがうまく働いている.
アクションについてもうまく反映されている.
#####ソースコード:アクチュエータの定義
参考文献において,作成されていたデモ用アクチュエータのソースコードを示す.改編箇所はセンサのときと同じであるから説明は省く.
#!/usr/bin/env python3
import sys
from PySide6.QtCore import *
from PySide6.QtGui import *
from PySide6.QtWidgets import * #QApplicationを含むライブラリ
class FakeActuator:
def __init__(self, callback=None):
self.callback = callback
self.app = QApplication(sys.argv)
self.vlayout = QVBoxLayout()
self.button = QPushButton('light')
self.button.pressed.connect(self._button_callback)
self.button.setCheckable(True)
self.button.setChecked(True)
self.button.setStyleSheet('background-color: white')
self.vlayout.addWidget(self.button)
self.slider = QSlider()
self.slider.setOrientation(Qt.Horizontal)
self.vlayout.addWidget(self.slider)
self.dial = QDial()
self.dial.setNotchesVisible(True)
self.dial.setWrapping(True)
self.vlayout.addWidget(self.dial)
self.quit = QPushButton('Quit')
self.quit.clicked.connect(self.app.quit)
self.vlayout.addWidget(self.quit)
self.group = QGroupBox('Fake Actuator')
self.group.setLayout(self.vlayout)
def _button_callback(self):
if self.button.isChecked():
self.button.setStyleSheet('background-color: red')
else:
self.button.setStyleSheet('background-color: white')
def light_on(self):
return self.button.checked()
def toggle_light(self, on):
self.button.setChecked(on)
def volume(self):
return self.slider.value()
def set_volume(self, value):
self.slider.setValue(value)
def position(self):
return self.dial.value()
def set_position(self, value):
self.dial.setValue(value)
def run(self):
self.group.show()
self.app.exec_()
if __name__ == '__main__':
a = FakeActuator()
a.run()
##感想
私がROSコミュニティに貢献できるほどの力を持っているかは別として,今回は何かしら新たなものを構築できたときに反映させるような概要を学んだ.それを通して,トピック・サービス・アクションについての特徴と使い分けについても改めて学ぶことができた.また,非常に恩恵を受けているROSコミュニティは,たくさんの貢献する人々によって構築され,ROSを強化しているということを知るだけでもROSを使っていくにあたって大切だと思う.
##参考文献
プログラミングROS Pythonによるロボットアプリケーション開発
Morgan Quigley, Brian Gerkey, William D.Smart 著
河田 卓志 監訳
松田 晃一,福地 正樹,由谷 哲夫 訳
オイラリー・ジャパン 発行