LoginSignup
0
1

More than 3 years have passed since last update.

ROSの勉強 第27弾:センサとアクチュエータの導入

Last updated at Posted at 2021-03-23

#プログラミング ROS< センサとアクチュエータの導入 >

はじめに

1つの参考書に沿って,ROS(Robot Operating System)を難なく扱えるようになることが目的である.その第27弾として,「センサとアクチュエータの導入」を扱う.

環境

仮想環境
ソフト VMware Workstation 15
実装RAM 2 GB
OS Ubuntu 64 ビット
isoファイル ubuntu-mate-20.04.1-desktop-amd64.iso
コンピュータ
デバイス MSI
プロセッサ Intel(R) Core(TM) i5-7300HQ CPU @ 2.50GHz 2.50GHz
実装RAM 8.00 GB (7.89 GB 使用可能)
OS Windows (Windows 10 Home, バージョン:20H2)
ROS
Distribution noetic
プログラミング言語 Python 3.8.5

センサとアクチュエータの導入

ROSはよく知られたセンサやアクチュエータを幅広くカバーしているが,すべてをカバーしているわけではない.新しいハードウェアが出てくれば,自分でそれをROSに取り組む必要がある.そうすることで,コミュニティ全体でそれを使えるようになる.
☆ROSのエコシステムに新しいセンサやアクチュエータを統合する方法を学ぶ
<作業>
デバイスにアクセスするのに使っているAPIを包むROSのラッパーを書くことなどが主な作業

センサ

概要

制約

・センサはすでに測定値を取得するためのPythonのAPIを持っている(API:Application Programming Interface)
・配線はすべて正しくされており,APIを使うことでセンサから値を正しく読み取ることができている

この制約に従っている限り,センサは機能していることになるので,うまく動作しないことは
すべてROSのラッパーの問題となり,デバッグが楽になる.

擬似センサ

単純なAPIを持つ擬似センサ(FakeSensorと呼ぶ)を使っていく
Pythonのクラスは実際のセンサをシミュレートし,これを用いることで,ハードウェアを購入しなくても,センサをROSに組み込む方法を学ぶことができる(PySideというGUIライブラリ使用)

ラッパーを設計

設計仕様の決定項目
・センサの測定値をトピック上で送るべきか,それともサービスやアクションを用いて要求されたときにのみ測定値を与えるようにするか
・センサからのデータを取得する方法.(センサによっては1つしかないものもある)
・作成したラッパーが送信するROSメッセージ型を決める.汎用性も考慮するとよい.

実装

エラーによりうまく動かなかった部分もあるが,それに関しても後述する.

ソースコード:トピックに測定値を定期的に送る
topic_sensor.py
#! /usr/bin/env python3
"""
topicを通して定期的に測定値を送る
"""
from math import pi
from fake_sensor import FakeSensor #作成されたfake_sensor.pyのクラスFakeSensorをインポート

import rospy
import tf
from geometry_msgs.msg import Quaternion

def make_quaternion(angle):
    """ヨー角(ラジアン)からQuaternionに変換する"""
    q = tf.transformations.quaternion_from_euler(0, 0, angle)   #オイラー角から四元数に変換
    return Quaternion(*q)   #四元数の形で出力

if __name__ == '__main__':
    fake_sensor = FakeSensor()  #FakeSensorクラスのインスタンスを作成

    rospy.init_node('fake_sensor')

    pub = rospy.Publisher('angle', Quaternion, queue_size = 10)

    rate = rospy.Rate(10.0) #配信周期の設定 (10Hz)

    while not rospy.is_shutdown():  #ノードが終了するまでループ
        angle = fake_sensor.sensor.value() * 2 * pi / 100.0
        #value()はActualFakeSensorクラスにあるが,FakeSensorクラス中ではself.sensorに
        #ActualFakeSensorクラスのインスタンスを作成しているため,アクセスするときには,
        #FakeSensor().sensor.value()とすればいい

        q = make_quaternion(angle)

        pub.publish(q)

        rate.sleep()
出力

ezgif-3-7e800d090aea

ソースコード:ストリーミングされた計測値をトピックに送る
topic_sensor2.py
#! /usr/bin/env python3
"""
計測値を固定周期で配信する
"""
from math import pi
from fake_sensor import FakeSensor #作成されたfake_sensor.pyのクラスFakeSensorをインポート

import rospy
import tf
from geometry_msgs.msg import Quaternion
from time import sleep
def make_quaternion(angle):
    """ヨー角(ラジアン)からQuaternionに変換する"""
    q = tf.transformations.quaternion_from_euler(0, 0, angle)   #オイラー角から四元数に変換
    return Quaternion(*q)   #四元数の形で出力

def publish_value(value):
    angle = value* 2 * pi / 100.0
    q = make_quaternion(angle)
    pub.publish(q)

if __name__ == '__main__':
    fake_sensor = FakeSensor()  #FakeSensorクラスのインスタンスを作成
    sleep(2)    #少し待たないと,FakeSensorの中で別のクラスを呼び出すのが間に合わなくなりエラーが出てしまう
    sensor = fake_sensor.sensor
    sensor.register_callback(publish_value)

    rospy.init_node('fake_sensor')
    pub = rospy.Publisher('angle', Quaternion, queue_size = 10)
出力
Segmentation fault (core dumped)

このようなエラーがでた.ネット上にあるたくさんの情報をもとに解決を図ったが,それでもうまく解決することができなかった.(解決したときには,修正することとする.)

ソースコード:ストリーミングされた計測値を固定周期で配信する
topic_sensor3.py
#! /usr/bin/env python3
"""
計測値を固定周期で配信する
"""
from math import pi
from threading import Lock  #Lockは同時実行を防ぐことができる
from fake_sensor import FakeSensor #作成されたfake_sensor.pyのクラスFakeSensorをインポート

import rospy
import tf
from geometry_msgs.msg import Quaternion
from time import sleep
def make_quaternion(angle):
    """ヨー角(ラジアン)からQuaternionに変換する"""
    q = tf.transformations.quaternion_from_euler(0, 0, angle)   #オイラー角から四元数に変換
    return Quaternion(*q)   #四元数の形で出力

def save_value(value):
    """現在の角度の値(センサの測定値)を格納するだけ"""
    global angle
    with lock:  #angleのロックを取得
        angle = value* 2 * pi / 100.0

if __name__ == '__main__':
    lock = Lock()     #angleのロックを作成し,同時アクセスを防ぐ
    fake_sensor = FakeSensor()  #FakeSensorクラスのインスタンスを作成
    sleep(2)    #少し待たないと,FakeSensorの中で別のクラスを呼び出すのが間に合わなくなりエラーが出てしまう

    sensor = fake_sensor.sensor
    sensor.register_callback(save_value)

    rospy.init_node('fake_sensor')

    pub = rospy.Publisher('angle', Quaternion, queue_size = 10)

    global angle
    angle = None
    rate = rospy.Rate(10.0) #配信周期の設定 (10Hz)

    while not rospy.is_shutdown():
        with lock:  #angleのロックを取得
            if angle:   #コールバックがすでに値をangleに代入していたとすると,True,実行されていないとFalse
                q = make_quaternion(angle)
                pub.publish(q)

        rate.sleep()
出力

ezgif-3-eb4e8fdc1390
値を変えるまでは動いていないことが分かる.
ターミナルを終了後には先ほど同様のエラーが現れる.(Segmentation fault (core dumped))

ソースコード:センサの測定値を要求に応じて送る

ここでは,サービスの定義をして,それをプログラムで用いた.

FakeSensor.srv

---
geometry_msgs/Quaternion quaternion

入力をstd_msgs/Emptyと明示的に定義することもできるようだが,catkin_make実行時にエラーが現れたため,省略することで,入力のないサービスコールを定義している.

service_sensor.py
#! /usr/bin/env python3
"""
センサの計測値を要求に応じて送る
self.
センサから測定値を高速に得られる場合 → サービスを使用
                  そうでない場合 → アクションコールを使用
"""

from math import pi
from fake_sensor import FakeSensor as FakeSensorUI #サービスの名前とややこしいから改名

import rospy
import tf

from geometry_msgs.msg import Quaternion
from sensor_actuator.srv import FakeSensor, FakeSensorResponse
from time import sleep

def make_quaternion(angle):
    """ヨー角(ラジアン)からQuaternionに変換する"""
    q = tf.transformations.quaternion_from_euler(0, 0, angle)   #オイラー角から四元数に変換
    return Quaternion(*q)   #四元数の形で出力

def callback(request):
    angle = sensor.value()* 2 * pi / 100.0
    q = make_quaternion(angle)

    return FakeSensorResponse(q)


if __name__ == '__main__':
    sensor = FakeSensorUI()  #FakeSensorクラスのインスタンスを作成
    #sleep(2)    #少し待たないと,FakeSensorの中で別のクラスを呼び出すのが間に合わなくなりエラーが出てしまう

    rospy.init_node('fake_sensor')

    service = rospy.Service('angle', FakeSensor, callback)
出力
Segmentation fault (core dumped)

ここでもまた同様のエラーが出てしまった.

ソースコード:センサの定義

参考文献において,作成されていたデモ用センサのソースコードを示す.一部バージョンの違いなどによるエラーが見受けられたため,改編している箇所がある.それについてはソースコードを示した後,記述する.

fake_sensor.py
#!/usr/bin/env python3

import sys
import threading

# Import the core and GUI elements of Qt
from PySide6.QtCore import *
from PySide6.QtGui import * 
from PySide6.QtWidgets import * #QApplicationを含むライブラリ

sensor = None

class ActualFakeSensor:
    def __init__(self, callback=None):
        self.callback = callback

        self.app = QApplication(sys.argv)
        self.layout = QVBoxLayout()

        self.dial = QDial()
        self.dial.valueChanged.connect(self.dial_callback)
        self.dial.setNotchesVisible(True)
        self.dial.setWrapping(True)
        self.layout.addWidget(self.dial)

        self.quit = QPushButton('Quit')
        self.quit.clicked.connect(self.app.quit)
        self.layout.addWidget(self.quit)

        self.group = QGroupBox('Fake Sensor')
        self.group.setLayout(self.layout)

    def dial_callback(self, value):
        if self.callback:
            self.callback(self.value())

    def register_callback(self, callback):
        self.callback = callback

    def value(self):
        return self.dial.value()

    def set_value(self, value):
        self.dial.setValue(value)

    def run(self):
        self.group.show()
        self.app.exec_()


class FakeSensor:
    def __init__(self):
        thread = threading.Thread(target=self.run)
        thread.daemon = True
        thread.start()

    def run(self):
        self.sensor = ActualFakeSensor()
        self.sensor.run()


if __name__ == '__main__':
    sensor = FakeSensor()

    from time import sleep
    while True:
        print('x')
        sleep(1)

改編箇所は3か所ある.1つ目は他のファイルも同様だが,Python3を使っているため,#!/usr/bin/env python3としている.2つ目と3つ目はプログラムに使うライブラリのインポート部分である.PySideにはいくつかバージョンがあり,そのバージョンに応じてサポートしているPythonのバージョンも変わる.はじめPySideとなっていたが,これはPython3.5あたりまでのサポートのようで,それ以降はhttps://pypi.org/project/PySide6/ などを参考に必要なバージョンをインストールする必要がある.ここでは,Python3.85に適したPySide6を取り入れた.3つ目は,from PySide6.QtWidgets import *の追記である.どうやらPySide6では,QApplicationという関数はQtWidgetsの中にあるようだ.(https://medium.com/weekly-python/getting-started-writing-qt-6-applications-in-python-with-pyside6-389ee4c384ee)

アクチュエータ

センサと変わらない.APIを熟知したうえでラッパーを設計する.
ここでは,1つのプログラムの中に,3つのアクチュエーションを実装している.それらについて1つ1つ特徴を見てから実装を示す.

特徴

連続的なアクチュエーション(トピック)

次の要求が来る前に,今の要求を完了可能である(アクチュエータによるバッファリングはされない)必要がある.そうでない場合には,ほかの設計のほうが適切.

不定期で瞬間的なアクチュエーション(サービス)

トピックを介して送られたメッセージが受信されなかったり,実行されない可能性が常にある(確率は低い).ROSトピックは,送信を保証するTCPソケット上に作られているが,購読する側のバッファーがオーバーフローした場合には,メッセージが失われるかもしれない.たまにしかコマンドを送信しない場合には,コマンドのうちの1つが失われてしまうのは大きな問題で,コマンドがすべて伝わったかどうか確認したくなる.この場合には,サービスコールを使用すべき.これにより,アクチュエータにコマンドを配信し,確認応答を取得するまで待機することができる.しかし,コマンドの実行に時間がかかったり,終了するまで待ちたくない場合には,問題となる可能性がある.⇒アクションインタフェースを使用.

不定期で時間のかかるアクチュエーション(アクション)

アクションインタフェースはサービスインタフェースに似ているが,非同期であるから,呼び出し側のノードがコマンドの終了を待つ必要がない.ナビゲーションスタックがそのよい例である.ロボットが目標位置に移動するまで待っていたくはないが,ロボットがいつそこに着いたかは知りたい.

実装

ここでは,srvファイルとactionファイル1つずつ定義したものを使用している.

Light.srv
bool on
---
bool status
Rotation.action
float32 orientation
---
float32 final_orientation
---
float32 current_orientation
ソースコード
actuator.py
#!/usr/bin/env python3

from fake_actuator import FakeActuator

import rospy
import actionlib
from std_msgs.msg import Float32

# BEGIN IMPORT
from sensor_actuator.srv import Light,LightResponse
from sensor_actuator.msg import RotationAction,RotationFeedback,RotationResult
# END IMPORT


##トピック:連続なアクチュエーション
# BEGIN TOPIC_CALLBACK
def volume_callback(msg):
    actuator.set_volume(min(100, max(0, int(msg.data * 100))))
# END TOPIC_CALLBACK


##サービス:不定期で瞬間的なアクチュエーション
# BEGIN SERVICE_CALLBACK
def light_callback(request):
    actuator.toggle_light(request.on)
    return LightResponse(actuator.light_on())
# END SERVICE_CALLBACK


##アクション:不定期で時間のかかるアクチュエーション
# BEGIN ACTION_CALLBACK
def rotation_callback(goal):
    feedback = RotationFeedback()
    result = RotationResult()

    actuator.set_position(goal.orientation)
    success = True

    rate = rospy.Rate(10)
    while fabs(goal.orientation - actuator.position()) > 0.01:
        if a.is_preempt_requested():
            success = False
            break;

        feedback.current_orientation = actuator.position()
        a.publish_feedback(feedback)
        rate.sleep()

    result.final_orientation = actuator.position()
    if success:
        a.set_succeeded(result)
    else:
        a.set_preempted(result)
# END ACTION_CALLBACK


if __name__ == '__main__':
    actuator = FakeActuator() #FakeActuatorのクラスインスタンスを作成

    # Initialize the node
    rospy.init_node('fake')

    # Topic for the volume
    t = rospy.Subscriber('fake/volume', Float32, volume_callback) #音量コマンドのためのトピックを購読

    # Service for the light
    s = rospy.Service('fake/light', Light, light_callback) #ライトを制御するサービスを公開

    # Action for the position
    a = actionlib.SimpleActionServer('fake/position', RotationAction, #回転位置を制御するアクションを公開
                                     execute_cb=rotation_callback,
                                     auto_start=False)
    a.start()

    # Start everything
    rospy.spin()
# END ALL

トピックを購読できている
actuator_topic_info
サービスがうまく働いている.
actuator_service
アクションについてもうまく反映されている.
actuator_topic

ソースコード:アクチュエータの定義

参考文献において,作成されていたデモ用アクチュエータのソースコードを示す.改編箇所はセンサのときと同じであるから説明は省く.

fake_actuator.py
#!/usr/bin/env python3

import sys

from PySide6.QtCore import * 
from PySide6.QtGui import * 
from PySide6.QtWidgets import * #QApplicationを含むライブラリ


class FakeActuator:
    def __init__(self, callback=None):
        self.callback = callback

        self.app = QApplication(sys.argv)

        self.vlayout = QVBoxLayout()

        self.button = QPushButton('light')
        self.button.pressed.connect(self._button_callback)
        self.button.setCheckable(True)
        self.button.setChecked(True)
        self.button.setStyleSheet('background-color: white')
        self.vlayout.addWidget(self.button)

        self.slider = QSlider()
        self.slider.setOrientation(Qt.Horizontal)
        self.vlayout.addWidget(self.slider)

        self.dial = QDial()
        self.dial.setNotchesVisible(True)
        self.dial.setWrapping(True)
        self.vlayout.addWidget(self.dial)

        self.quit = QPushButton('Quit')
        self.quit.clicked.connect(self.app.quit)
        self.vlayout.addWidget(self.quit)

        self.group = QGroupBox('Fake Actuator')
        self.group.setLayout(self.vlayout)

    def _button_callback(self):
        if self.button.isChecked():
            self.button.setStyleSheet('background-color: red')
        else:
            self.button.setStyleSheet('background-color: white')

    def light_on(self):
        return self.button.checked()

    def toggle_light(self, on):
        self.button.setChecked(on)

    def volume(self):
        return self.slider.value()

    def set_volume(self, value):
        self.slider.setValue(value)

    def position(self):
        return self.dial.value()

    def set_position(self, value):
        self.dial.setValue(value)

    def run(self):
        self.group.show()
        self.app.exec_()


if __name__ == '__main__':
    a = FakeActuator()
    a.run()

感想

私がROSコミュニティに貢献できるほどの力を持っているかは別として,今回は何かしら新たなものを構築できたときに反映させるような概要を学んだ.それを通して,トピック・サービス・アクションについての特徴と使い分けについても改めて学ぶことができた.また,非常に恩恵を受けているROSコミュニティは,たくさんの貢献する人々によって構築され,ROSを強化しているということを知るだけでもROSを使っていくにあたって大切だと思う.

参考文献

プログラミングROS Pythonによるロボットアプリケーション開発
        Morgan Quigley, Brian Gerkey, William D.Smart 著
                       河田 卓志 監訳
            松田 晃一,福地 正樹,由谷 哲夫 訳
                  オイラリー・ジャパン 発行

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1