14
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

リアルタイムOSコントローラ e-RT3 (ROS2から制御編)

Last updated at Posted at 2020-12-20

#1.はじめに

横河電機の「リアルタイムOSコントローラ(e-RT3)」における入出力モジュールの制御を、ロボット制御に活用されているROSから制御する方法をまとめてみました。
image.png

ここでは簡単な制御の例として、ROS2を使ってシグナルタワーを歩行者信号機のように点灯させることを目標としています。

image.png

リアルタイムOSコントローラ e-RT3 関連記事

第1回 セットアップ編
第2回 入出力ユニット編 (PythonとC言語から制御)
第3回 Elixirから制御編
第4回(今回) ROS2から制御編
第5回 Rustから制御編
第6回 Goから制御編

#3.考え方

大ざっぱな操作の流れのイメージは下記の通りです。
image.png

名称 (クラス名) 役割
/ert3pub Pub ノード 歩行者信号機の動作のメッセージを送出する
mes_ert3pub トピック -
DioMsg メッセージ e-RT3のリレー出力制御用のカスタムメッセージ
/ert3sub Sub ノード メッセージを受けて、PLCDyに操作対象のリレー番号とON/OFF状態を渡す
PlcDy クラス libm3.soのAPIを操作するPythonのクラス
libm3.so ライブラリ e-RT3のIOユニット制御のライブラリ(メーカから提供)

配線

ランプ色 YD接続先 端子台番号
赤色 YD 01 000
赤色 YD 02 001

image.png

端子台の0V+VにDC24Vのアダプタで電源を供給しています。

動作の様子

image.png

下記の順番で点灯します。

順番 赤ランプ 緑ランプ 時間(sec)
1 点灯 消灯 5
2 消灯 点灯 5
3 点灯 点滅 0.5 間隔
(最初に戻る) 点灯 消灯 -

a_320x240_fps5.gif

#4.ROS2のインストール

別記事を参照してください。

#5.ソースコード

長くなるので、主要な部分だけ抜粋してます。
全てのソースファイルはGithubに置いてありますので、こちらを参照してください。
(ファイル置いただけですので、読みにくくてすみません・・・)

ビルド方法

$ cd ~/ros2_ws/src
#クローン
$ git clone https://github.com/trihome/eRT3_ros2.git
$ cd eRT3_ros2

#ビルド
$ cd ~/ros2_ws/src
$ colcon build --symlink-install

実行

roslaunchから立ち上げる場合

#環境変数の読み込み
~/ros2_ws/src$ source ./install/setup.bash
#起動
~/ros2_ws/src$ ros2 launch ert3py ert3py.launch.py

ros2 runで個別に立ち上げる場合

コンソール1
#環境変数の読み込み
~/ros2_ws/src$ source ./install/setup.bash
#起動
~/ros2_ws/src$ ros2 run ert3py sub
コンソール2
#環境変数の読み込み
~/ros2_ws/src$ source ./install/setup.bash
#起動
~/ros2_ws/src$ ros2 run ert3py pub

各ソースの抜粋

今回はPythonを使ってるので、ハードウェア制御に関しては過去記事を参考に作成しました。

出力制御

ert3py/node/Sub.py(抜粋)
import ctypes

class PlcDy():
    """
    e-RT3出力制御
    """

    # ライブラリのフルパス
    __LIBM3 = "/usr/local/lib/libm3.so.1"

    def __init__(self, slot, unit=0):
        """
        コンストラクタ
        Parameters
        ----------
        slot : int
            YDを設置しているスロット番号を指定
        unit : int
            ユニット番号を指定
        """
        # ライブラリ読み込み
        self.__libc = ctypes.cdll.LoadLibrary(self.__LIBM3)
        # pythonの変数をint型に変換
        self.__c_unit = ctypes.c_int(unit)
        self.__c_slot = ctypes.c_int(slot)
        # 全消灯
        self.write(0x0000, 0x0000)

    def __del__(self):
        """
        デストラクタ
        """
        # 全消灯
        self.write(0x0000, 0x0000)

    def write(self, data_upper, data_lower, mask_upper=0xffff, mask_lower=0xffff, pos=1, num=2):
        """
        リレー出力
        Parameters
        ----------
        data_upper : int
            上位2Byteの出力データ
        data_lower : int
            下位2Byteの出力データ
        mask_upper : int
            上位2Byteのマスクデータ
        mask_lower : int
            下位2Byteのマスクデータ
        pos : int
            リレー番号(操作先頭の番号)
        num : int
            書き込みブロック数
        """
        # リレー番号
        # ブロック読み込みの場合は1, 17, 33, (16n+1)・・・を指定
        c_pos = ctypes.c_int(pos)
        # 書き込みブロック数(1ブロック16点)
        c_num = ctypes.c_int(num)
        # 書き込みデータ格納バッファ用のshort型配列を作る
        # 要素数4のshort型の配列
        short_arr = ctypes.c_uint16 * 4
        # 32 x 2 点分の書き込みデータ作成
        data = [data_lower, data_upper]
        c_data = short_arr(*data)
        # 32 x 2 点分のマスクデータ作成
        mask = [mask_lower, mask_upper]
        c_mask = short_arr(*mask)

        # ライブラリ関数呼び出し
        self.__libc.writeM3OutRelay(
            self.__c_unit, self.__c_slot, c_pos, c_num, c_data, c_mask)

    def write1(self, pos, val):
        """
        リレー出力・1ch単位
        Parameters
        ----------
        pos : int
            リレー番号
        val : int
            値(0:OFF, 1>=:ON)
        """
        wpos = 0x01
        if 0 < pos <= 16:
            # 1~16のとき
            # ビットシフト準備
            pos -= 1
            # ビットシフト
            wpos = wpos << pos
            if val > 0:
                # 値が1以上の時はON
                self.write(0, wpos, 0, wpos)
            else:
                # 値がゼロの時はOFF
                self.write(0, 0, 0, wpos)
        elif 16 < pos <= 32:
            # 17~32のとき
            # ビットシフト準備
            pos = pos - 16 - 1
            # ビットシフト
            wpos = wpos << pos
            if val > 0:
                # 値が1以上の時はON
                self.write(wpos, 0, wpos, 0)
            else:
                # 値がゼロの時はOFF
                self.write(0, 0, wpos, 0)
        else:
            # リレー番号を超えたとき
            print(" * ERR: out of range")

ROS関連ソース

ROS側のソースの作り方は、過去記事を参考にしています。

ert3_mes/msg/DioMsg.msg
int16 relay   #リレー出力番号
int16 value   #出力状態 0:OFF 1>:ON
ert3py/node/Sub.py(抜粋)
import rclpy
from rclpy.node import Node
# カスタムメッセージ
from ert3_mes.msg import DioMsg
# PLC出力制御
from node.PlcDy import PlcDy

class MySubscription(Node):
    """
    受信側
    """
    # ノード名
    SELFNODE = "ert3sub"
    # トピック名
    SELFTOPIC = "mes_" + SELFNODE

    def __init__(self):
        """
        コンストラクタ
        """
        # ノードの初期化
        super().__init__(self.SELFNODE)
        # コンソールに表示
        self.get_logger().info(f"{self.SELFNODE} initializing...")
        # subscriptionインスタンスを生成
        self.sub = self.create_subscription(
            DioMsg, "mes_ert3pub", self.callback, 10)
        # コンソールに表示
        self.get_logger().info(f"{self.SELFNODE} do...")
        # PLC操作
        self.plcdy = PlcDy(2)

    def callback(self, message):
        """
        コールバック関数
        Parameters
        ----------
        message : DioMsg
            メッセージ
        """
        # 受け取ったメッセージの表示
        self.get_logger().info(
            f"Subscription > Relay: {message.relay} Value: {message.value}")
        # PLCの操作
        self.plcdy.write1(message.relay, message.value)
ert3py/node/Pub.py(抜粋)
import rclpy
from rclpy.node import Node
# カスタムメッセージ
from ert3_mes.msg import DioMsg

class MyPublisher(Node):
    """
    送信側
    """

    # ノード名
    SELFNODE = "ert3pub"
    # トピック名
    SELFTOPIC = "mes_" + SELFNODE

    def __init__(self):
        """
        コンストラクタ
        """
        # ノードの初期化
        super().__init__(self.SELFNODE)
        # コンソールに表示
        self.get_logger().info(f"{self.SELFNODE} initializing...")
        # publisherインスタンスを生成
        self.pub = self.create_publisher(DioMsg, self.SELFTOPIC, 10)
        # タイマーのインスタンスを生成(1秒ごとに発生)
        self.create_timer(0.5, self.callback)
        # シーケンス番号をリセット
        self.sequence = 0
        # コンソールに表示
        self.get_logger().info(f"{self.SELFNODE} do...")

    def __del__(self):
        """
        デストラクタ
        """
        # コンソールに表示
        self.get_logger().info(f"{self.SELFNODE} done.")

    def callback(self):
        """
        タイマーの実行部・歩行者信号の動作
        """

        # シーケンス制御
        if self.sequence == 0:
            # 初期化(一度だけ実行)
            pass
        if self.sequence == 1:
            # 赤色点灯
            self.sendmsg(1, 1)
            # 緑色消灯
            self.sendmsg(2, 0)
        elif self.sequence == 10:
            # 赤色消灯
            self.sendmsg(1, 0)
            # 緑色点灯
            self.sendmsg(2, 1)
        elif self.sequence in [21, 23, 25, 27, 29]:
            # 緑色消灯
            self.sendmsg(2, 0)
        elif self.sequence in [22, 24, 26, 28]:
            # 緑色点灯
            self.sendmsg(2, 1)
        elif self.sequence == 30:
            # 赤色点灯
            self.sendmsg(1, 1)
            # 緑色消灯
            self.sendmsg(2, 0)
        elif self.sequence > 30:
            # シーケンス終了
            self.sequence = 0

        # シーケンス番号をインクリメント
        self.sequence += 1

    def sendmsg(self, relay, value):
        """
        メッセージの送信
        Parameters
        ----------
        relay : int
            リレー番号を指定
        value : int
            0:消灯、1>:点灯
        """
        # 送信するメッセージの作成
        msg = DioMsg()
        msg.relay = relay
        msg.value = value
        # 送信
        self.pub.publish(msg)
        # ログの表示
        self.get_logger().info(
            f"Publish [{self.sequence} / relay: {relay}, val: {value}]")

#4.おわりに

前回はElixirで試しましたが、ROSに関しても、Raspberry PiやJetsonの感覚で、カンタンに立ち上げることが出来ました。
やはり”素の”Ubuntu Linuxが普通に動くのは有り難いですね~

14
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?