#1.はじめに
横河電機の「リアルタイムOSコントローラ(e-RT3)」における入出力モジュールの制御を、ロボット制御に活用されているROSから制御する方法をまとめてみました。
ここでは簡単な制御の例として、ROS2を使ってシグナルタワーを歩行者信号機のように点灯させることを目標としています。
リアルタイムOSコントローラ e-RT3 関連記事
第1回 | セットアップ編 |
第2回 | 入出力ユニット編 (PythonとC言語から制御) |
第3回 | Elixirから制御編 |
第4回(今回) | ROS2から制御編 |
第5回 | Rustから制御編 |
第6回 | Goから制御編 |
#3.考え方
名称 | (クラス名) | 役割 | |
---|---|---|---|
/ert3pub | Pub | ノード | 歩行者信号機の動作のメッセージを送出する |
mes_ert3pub | トピック | - | |
DioMsg | メッセージ | e-RT3のリレー出力制御用のカスタムメッセージ | |
/ert3sub | Sub | ノード | メッセージを受けて、PLCDyに操作対象のリレー番号とON/OFF状態を渡す |
PlcDy | クラス |
libm3.so のAPIを操作するPythonのクラス |
|
libm3.so | ライブラリ | e-RT3のIOユニット制御のライブラリ(メーカから提供) |
配線
ランプ色 | YD接続先 | 端子台番号 |
---|---|---|
赤色 | YD 01 | 000 |
赤色 | YD 02 | 001 |
- 端子台:PCXV-1H40-TB40-O3
- YD出力ユニット:F3YD32-1A (p.105)
端子台の0V
と+V
にDC24Vのアダプタで電源を供給しています。
動作の様子
下記の順番で点灯します。
順番 | 赤ランプ | 緑ランプ | 時間(sec) |
---|---|---|---|
1 | 点灯 | 消灯 | 5 |
2 | 消灯 | 点灯 | 5 |
3 | 点灯 | 点滅 | 0.5 間隔 |
(最初に戻る) | 点灯 | 消灯 | - |
#4.ROS2のインストール
別記事を参照してください。
#5.ソースコード
長くなるので、主要な部分だけ抜粋してます。
全てのソースファイルはGithubに置いてありますので、こちらを参照してください。
(ファイル置いただけですので、読みにくくてすみません・・・)
ビルド方法
$ cd ~/ros2_ws/src
#クローン
$ git clone https://github.com/trihome/eRT3_ros2.git
$ cd eRT3_ros2
#ビルド
$ cd ~/ros2_ws/src
$ colcon build --symlink-install
実行
roslaunchから立ち上げる場合
#環境変数の読み込み
~/ros2_ws/src$ source ./install/setup.bash
#起動
~/ros2_ws/src$ ros2 launch ert3py ert3py.launch.py
ros2 runで個別に立ち上げる場合
コンソール1
#環境変数の読み込み
~/ros2_ws/src$ source ./install/setup.bash
#起動
~/ros2_ws/src$ ros2 run ert3py sub
コンソール2
#環境変数の読み込み
~/ros2_ws/src$ source ./install/setup.bash
#起動
~/ros2_ws/src$ ros2 run ert3py pub
各ソースの抜粋
今回はPythonを使ってるので、ハードウェア制御に関しては過去記事を参考に作成しました。
出力制御
ert3py/node/Sub.py(抜粋)
import ctypes
class PlcDy():
"""
e-RT3出力制御
"""
# ライブラリのフルパス
__LIBM3 = "/usr/local/lib/libm3.so.1"
def __init__(self, slot, unit=0):
"""
コンストラクタ
Parameters
----------
slot : int
YDを設置しているスロット番号を指定
unit : int
ユニット番号を指定
"""
# ライブラリ読み込み
self.__libc = ctypes.cdll.LoadLibrary(self.__LIBM3)
# pythonの変数をint型に変換
self.__c_unit = ctypes.c_int(unit)
self.__c_slot = ctypes.c_int(slot)
# 全消灯
self.write(0x0000, 0x0000)
def __del__(self):
"""
デストラクタ
"""
# 全消灯
self.write(0x0000, 0x0000)
def write(self, data_upper, data_lower, mask_upper=0xffff, mask_lower=0xffff, pos=1, num=2):
"""
リレー出力
Parameters
----------
data_upper : int
上位2Byteの出力データ
data_lower : int
下位2Byteの出力データ
mask_upper : int
上位2Byteのマスクデータ
mask_lower : int
下位2Byteのマスクデータ
pos : int
リレー番号(操作先頭の番号)
num : int
書き込みブロック数
"""
# リレー番号
# ブロック読み込みの場合は1, 17, 33, (16n+1)・・・を指定
c_pos = ctypes.c_int(pos)
# 書き込みブロック数(1ブロック16点)
c_num = ctypes.c_int(num)
# 書き込みデータ格納バッファ用のshort型配列を作る
# 要素数4のshort型の配列
short_arr = ctypes.c_uint16 * 4
# 32 x 2 点分の書き込みデータ作成
data = [data_lower, data_upper]
c_data = short_arr(*data)
# 32 x 2 点分のマスクデータ作成
mask = [mask_lower, mask_upper]
c_mask = short_arr(*mask)
# ライブラリ関数呼び出し
self.__libc.writeM3OutRelay(
self.__c_unit, self.__c_slot, c_pos, c_num, c_data, c_mask)
def write1(self, pos, val):
"""
リレー出力・1ch単位
Parameters
----------
pos : int
リレー番号
val : int
値(0:OFF, 1>=:ON)
"""
wpos = 0x01
if 0 < pos <= 16:
# 1~16のとき
# ビットシフト準備
pos -= 1
# ビットシフト
wpos = wpos << pos
if val > 0:
# 値が1以上の時はON
self.write(0, wpos, 0, wpos)
else:
# 値がゼロの時はOFF
self.write(0, 0, 0, wpos)
elif 16 < pos <= 32:
# 17~32のとき
# ビットシフト準備
pos = pos - 16 - 1
# ビットシフト
wpos = wpos << pos
if val > 0:
# 値が1以上の時はON
self.write(wpos, 0, wpos, 0)
else:
# 値がゼロの時はOFF
self.write(0, 0, wpos, 0)
else:
# リレー番号を超えたとき
print(" * ERR: out of range")
ROS関連ソース
ROS側のソースの作り方は、過去記事を参考にしています。
ert3_mes/msg/DioMsg.msg
int16 relay #リレー出力番号
int16 value #出力状態 0:OFF 1>:ON
ert3py/node/Sub.py(抜粋)
import rclpy
from rclpy.node import Node
# カスタムメッセージ
from ert3_mes.msg import DioMsg
# PLC出力制御
from node.PlcDy import PlcDy
class MySubscription(Node):
"""
受信側
"""
# ノード名
SELFNODE = "ert3sub"
# トピック名
SELFTOPIC = "mes_" + SELFNODE
def __init__(self):
"""
コンストラクタ
"""
# ノードの初期化
super().__init__(self.SELFNODE)
# コンソールに表示
self.get_logger().info(f"{self.SELFNODE} initializing...")
# subscriptionインスタンスを生成
self.sub = self.create_subscription(
DioMsg, "mes_ert3pub", self.callback, 10)
# コンソールに表示
self.get_logger().info(f"{self.SELFNODE} do...")
# PLC操作
self.plcdy = PlcDy(2)
def callback(self, message):
"""
コールバック関数
Parameters
----------
message : DioMsg
メッセージ
"""
# 受け取ったメッセージの表示
self.get_logger().info(
f"Subscription > Relay: {message.relay} Value: {message.value}")
# PLCの操作
self.plcdy.write1(message.relay, message.value)
ert3py/node/Pub.py(抜粋)
import rclpy
from rclpy.node import Node
# カスタムメッセージ
from ert3_mes.msg import DioMsg
class MyPublisher(Node):
"""
送信側
"""
# ノード名
SELFNODE = "ert3pub"
# トピック名
SELFTOPIC = "mes_" + SELFNODE
def __init__(self):
"""
コンストラクタ
"""
# ノードの初期化
super().__init__(self.SELFNODE)
# コンソールに表示
self.get_logger().info(f"{self.SELFNODE} initializing...")
# publisherインスタンスを生成
self.pub = self.create_publisher(DioMsg, self.SELFTOPIC, 10)
# タイマーのインスタンスを生成(1秒ごとに発生)
self.create_timer(0.5, self.callback)
# シーケンス番号をリセット
self.sequence = 0
# コンソールに表示
self.get_logger().info(f"{self.SELFNODE} do...")
def __del__(self):
"""
デストラクタ
"""
# コンソールに表示
self.get_logger().info(f"{self.SELFNODE} done.")
def callback(self):
"""
タイマーの実行部・歩行者信号の動作
"""
# シーケンス制御
if self.sequence == 0:
# 初期化(一度だけ実行)
pass
if self.sequence == 1:
# 赤色点灯
self.sendmsg(1, 1)
# 緑色消灯
self.sendmsg(2, 0)
elif self.sequence == 10:
# 赤色消灯
self.sendmsg(1, 0)
# 緑色点灯
self.sendmsg(2, 1)
elif self.sequence in [21, 23, 25, 27, 29]:
# 緑色消灯
self.sendmsg(2, 0)
elif self.sequence in [22, 24, 26, 28]:
# 緑色点灯
self.sendmsg(2, 1)
elif self.sequence == 30:
# 赤色点灯
self.sendmsg(1, 1)
# 緑色消灯
self.sendmsg(2, 0)
elif self.sequence > 30:
# シーケンス終了
self.sequence = 0
# シーケンス番号をインクリメント
self.sequence += 1
def sendmsg(self, relay, value):
"""
メッセージの送信
Parameters
----------
relay : int
リレー番号を指定
value : int
0:消灯、1>:点灯
"""
# 送信するメッセージの作成
msg = DioMsg()
msg.relay = relay
msg.value = value
# 送信
self.pub.publish(msg)
# ログの表示
self.get_logger().info(
f"Publish [{self.sequence} / relay: {relay}, val: {value}]")
#4.おわりに
前回はElixirで試しましたが、ROSに関しても、Raspberry PiやJetsonの感覚で、カンタンに立ち上げることが出来ました。
やはり”素の”Ubuntu Linuxが普通に動くのは有り難いですね~