こんにちは!この記事では、 株式会社Octa Roboticsが開発・提供している ロボット・設備間連携に特化したマルチベンダー型のインターフェースサービス「LCI」 について、ロボットからの利用方法を連載形式で紹介していきます。
前回と前々回の記事では、通信プロトコルの「エレベーター状態取得(RequestElevaotrStatus)」の実装を行いました。
今回は、「乗り込み・乗り込み完了(RobotStatus)」について実装を行っていきます。
シーケンスのおさらい
乗り込み・乗り込み完了(RobotStatus)のシーケンスについて、再確認していきましょう。
乗り込み・乗り込み完了では、それぞれ以下の手順でロボットとエレベーター(LCI)間でMQTT通信を行います。
乗り込み
- ロボットからエレベーターにRobotStatus(State:5)を送信
- エレベーターからロボットにRobotStatusResultを送信
- ロボットから乗り込み完了を送るまで、5秒間隔で実施
乗り込み完了
- ロボットからエレベーターにRobotStatus(State:1)を送信
- エレベーターからロボットにRobotStatusResultを送信
各シーケンスでresultが1の場合は成功、RobotStatusResultのresultが2、3、99のいずれかの場合は失敗とみなします。
次は、トピック(メッセージの宛先)とペイロード(メッセージの中身)を確認していきましょう。
プロトコルの仕様
RobotStatusとRobotStatusResultのプロトコルの仕様を確認していきます。
トピック
| プロトコル | トピック |
|---|---|
| RobotStatus | /lci/<bldg_id>/<bank_id>/<elevator_id>/RobotStatus/<robot_id> |
| RobotStatusResult | /lci/<bldg_id>/<bank_id>/<elevator_id>/RobotStatusResult/<robot_id> |
第3回で使用した設定ファイル:server_config_simulator_1-1.yamlを利用する場合には、以下になります。
| プロトコル | トピック |
|---|---|
| RobotStatus | /lci/simulator/1/1/RobotStatus/_uClosc2 |
| RobotStatusResult | /lci/simulator/1/1/RobotStatusResult/_uClosc2 |
- bldg_id : simulator(設定ファイル内のlci_bldg_id)
- bank_id : 1(設定ファイル内のlci_bank_id)
- elevator_id : 1(設定ファイル内のlci_elevator_id)
- robot_id : _uClosc2(開発用ロボットアカウントのID)
ペイロード
| プロトコル | ペイロード |
|---|---|
| RobotStatus | { "state": <state>, "timestamp": <timestamp>, "robot_id":<robot_id> } |
| RobotStatusResult | { "result": <result>, "timestamp": <timestamp>, "requested_robot_id": <requested_robot_id>, "requested_timestamp": <requested_timestamp> } |
- timestamp : RobotStatusもしくはRobotStatusResultを送信した時間(エポック秒)
- robot_id : RobotStatusを行うロボットのID(今回は_uClosc2)
- result : RobotStatusの結果(1:成功、2:失敗、3:その他エラー、99:管制運転中)
- requested_robot_id : RobotStatusを行うロボットのID(今回は_uClosc2)
- requested_timestamp : RobotStatusが送られてきた時間
ペイロード内のstateは、エレベーター乗降時のロボットの状態を示します。
具体的には、ロボットの状態に合わせて以下から選択します。
| 値 | 状態 |
|---|---|
| 1 | 乗り込み完了 |
| 2 | 降車完了 |
| 3 | 乗り込み中止 |
| 4 | 降車中止 |
| 5 | ドア開継続要求 |
サンプルコードの確認
サンプルコードの乗り込み、乗り込み完了に関連する箇所を確認していきましょう。
class RobotStatus
このクラスでは、RobotStatus内のstateの値を変数として定義しています。
class RobotStatus(Enum):
HAS_ENTERED = 1
HAS_GOT_OFF = 2
CANCEL_TO_ENTER = 3
CANCEL_TO_GET_OFF = 4
KEEP_DOOR_OPEN = 5
do_robot_status
この関数では、以下のことを行っています。
- ペイロードの作成
- トピックの作成
- メッセージのパブリッシュ(
self._publish_and_wait_response) - RobotStatusResultのレスポンスを待つ(
self._publish_and_wait_response)
def robot_status(self, robot_status: RobotStatus, time_out: int=10):
if not self.is_registered():
return LciResponse()
requested_time = time.time()
payload = json.dumps({
'state': robot_status.value,
'timestamp': requested_time,
'robot_id': self.robot_id})
api = 'RobotStatus'
topic = f'{self.topic_prefix}/{self.elevator_id}/{api}/{self.robot_id}'
res = self._publish_and_wait_response(topic, payload, api, requested_time, time_out)
return res
メイン処理
エレベーターの乗り込みと乗り込み完了時の合計2回do_robot_statusを呼び出します。
関数を呼び出す際に引数には、それぞれ以下を指定します。
- 乗り込み:
RobotStatus.KEEP_DOOR_OPEN(5) - 乗り込み完了:
RobotStatus.HAS_ENTERED(1)
if __name__ == '__main__':
#--省略--#
#dry runによる疎通確認
res = lci_client.do_registration(True)
lci_client.logger.debug(res)
time.sleep(1)
#出発階への呼び出し
res = lci_client.do_call_elevator('B1', '1F')
lci_client.logger.debug(res)
time.sleep(1)
#エレベーターへの状態取得
lci_client.start_request_elevator_status()
lci_client.logger.debug("Start Request Elevator Status")
time.sleep(1)
#乗り込み
res = lci_client.do_robot_status(RobotStatus.KEEP_DOOR_OPEN)
lci_client.logger.debug(res)
time.sleep(1)
#乗り込み完了
res = lci_client.do_robot_status(RobotStatus.HAS_ENTERED)
lci_client.logger.debug(res)
time.sleep(1)
#行き先階への登録
res = lci_client.do_call_elevator('B1', '1F')
lci_client.logger.debug(res)
time.sleep(1)
#降車
res = lci_client.do_robot_status(RobotStatus.KEEP_DOOR_OPEN)
lci_client.logger.debug(res)
time.sleep(1)
#エレベーター状態取得終了
lci_client.stop_request_elevator_status()
lci_client.logger.debug("End Request Elevator Status")
time.sleep(1)
#降車完了
res = lci_client.do_robot_status(RobotStatus.HAS_GOT_OFF)
lci_client.logger.debug(res)
time.sleep(1)
#利用解除
res = lci_client.do_release()
lci_client.logger.debug(res)
time.sleep(1)
サンプルコードの改良
サンプルコード(lci_client.py)では、シーケンスの乗り込み・乗り込み完了について以下の箇所が未実装です。
- ElevatorStatus内の
floor(エレベーターの現在階)とdoor(ドアの開閉情報)を確認し、乗り込みを行ってよいか判断する - 乗り込み完了および乗り込み中止まで5秒間隔でRobotStatus(state:5)をループする
- 一定時間内に乗り込みができない場合は、乗り込み中止(state: 3)を送信する
- RobotStatusResult内のresultが2、3、99だった場合のエラー処理
これらの処理を行えるようにサンプルコードを改良していきましょう。
resultの意味と発生条件
はじめにRobotStatusResult内のresultの意味と発生条件について確認していきましょう。
| 評価順 | <result>の値 | 意味 | 発生条件 |
|---|---|---|---|
| 1 | 3 | その他エラー | ペイロードのフィールドの一部又は全部が設定されていない。 |
| 2 | 3 | その他エラー | トピック名の robot_id と、ペイロードの robot_id が一致しない。 |
| 3 | 99 | 管制運転中 | エレベーターが管制運転中のため連携不能。 |
| 4 | 3 | その他エラー | エレベーターがロボット連携モードにも関わらず、ロボットが利用登録されていない(状態のミスマッチ)。 |
| 5 | 2 | 拒否(失敗) | ・エレベーターがロボット連携モードになっていない ・他のロボットが利用中 ・エレベーターがロボット連携モードに遷移中 ・エレベーターがロボット連携モードを解除中 |
| 6 | 3 | その他エラー | ・state の値が定義外 ・エレベーターが出発階で戸開待機しているとき以外で、「乗り込み完了」を送信した ・エレベーターが出発階/行き先階で戸開待機しているとき以外で、「ドア開継続要求」を送信した |
| 7 | 1 | 成功 | ロボットからの通知が受理された。 |
各resultの値に応じたロボットの動作
先程の表をもとに、resultの値に対してロボットがどのように動作するかを決めていきます。
今回は仕様書をもとに以下のように定めます。
乗り込み(ドア開継続要求)
| <result> | <result>の意味 | ロボット側の動作 |
|---|---|---|
| 1 | 成功 | エレベーターに対する乗り込みの実施 |
| 2 | 拒否(失敗) | 異常シーケンスに遷移 |
| 3 | その他エラー | 異常シーケンスに遷移 |
| 99 | 管制運転中 | 異常シーケンスに遷移 |
乗り込み完了
| <result> | <result>の意味 | ロボット側の動作 |
|---|---|---|
| 1 | 成功 | 次のステップに進む |
| 2 | 拒否(失敗) | 異常シーケンスに遷移 |
| 3 | その他エラー | 異常シーケンスに遷移 |
| 99 | 管制運転中 | 異常シーケンスに遷移 |
乗り込み中止
| <result> | <result>の意味 | ロボット側の動作 |
|---|---|---|
| 1 | 成功 | 異常シーケンスに遷移 |
| 2 | 拒否(失敗) | 異常シーケンスに遷移 |
| 3 | その他エラー | 異常シーケンスに遷移 |
| 99 | 管制運転中 | 異常シーケンスに遷移 |
RobotStatus state:3 (乗り込み中止) をLCIが受け付けた場合は、Releaseと同じリセット効果がある為、ドアが閉じます。
フロー図
動作ごとに分けてフロー図を作成します。
エレベーター呼び出し階到着&ドア開確認
- (1) 経過時間teを確認
- 180sを超えていた場合、異常シーケンスに遷移
- (2) 出発階にエレベーターが到着したか確認
- (3) ドアが開状態か確認
- (4) 両方確認できた場合、ドア開継続要求に進む
ドア開継続要求
- (1) 経過時間teを確認
- 180sを超えていた場合は異常シーケンスに遷移
- (2) stop event(=threading.Event())がsetされているか確認
- setされていた場合はループを終了し次のシーケンスに遷移
- (3) RobotStatus(state:5)を送信
- (4) RobotStatusResultを受信したか確認
- 待機時間 tw が10sを超えた場合、異常シーケンスに遷移
- (5) RobotStatusResultのresultの値が1か確認
- resultの値が2, 3, 99の場合は異常シーケンスに遷移
- (6) 5秒間待機
RobotStatus state:5 はRFA規格で必須ですが、エレベーターが行き先階で到着して戸開したとき、ネットワーク接続がなかなか回復しない場合には送信できない可能性があります。
LCIは、内部的に出発階、行き先階への到着と戸開を監視しており、戸開と同時に開状態を維持するよう動作するため、RobotStautus state:5が送られなくても、実際にはドアが閉まるおそれはありません。
乗り込み中止
- (1)RobotStatus state:3の合計送信回数を確認
- 5回を超えた場合は、異常シーケンスに遷移
- (2)RobotStatus state:3を送信
- (3)RobotStatusResultを受信したか確認
LCI側で正しいログを出力するために、レスポンスが一定時間内に来ない場合にはメッセージの再送を行います。
乗り込み成功(RobotStatus state:1)
- (1) RobotStatus state:1 を送信
- (2) RobotStatusResultを受信したか確認
- 待機時間twが10sを超えてもメッセージを受信できない場合は異常シーケンスに遷移
- (3)RobotStatusResultのresultの値が1か確認
- (4)resultの値が2, 3, 99の場合は異常シーケンスに遷移
実装したコード
関数の役割と依存関係
実装したコードで追加した役割と依存関係を確認します。
-
start_robot_status_open_hold
-
robot_status_open_hold_workerをスレッド化し実行
-
-
stop_robot_status_open_hold
- ループ停止のフラグをセット
-
robot_status_open_hold_worker
- 5秒間隔で
robot_statusを実行 - ループ停止のフラグが立ったら、ループ終了
- 5秒間隔で
-
wait_for_arrival
- エレベーターが出発階に到着したか・ドアが開いたか確認
- どちらも達成したら、Trueを返す
-
do_robot_status_success_enter
- 乗り込み完了(state:1)を送信
- RobotStatusResultのResultが1の場合は次のシーケンスを実行
-
robot_status_cancel_enter
-
robot_status_repeat_commandを実行 - 5回再送を行うよう引数に設定
-
-
robot_status_repeat_command
-
robot_status関数を希望のresult値が返ってくるまで再送する
-
-
robot_status
- ペイロードの作成
- トピックの作成
- メッセージのパブリッシュ
- RobotStatusResultのレスポンスを待つ・レスポンスを呼び出した関数に返す
プログラム
追加した変数
-
lock_robot_status: 乗り込み完了/中止送信後に、ドア開継続要求を送信しないように排他ロックを行う -
stop_event_robot_status:ドア開継続要求のループ終了フラグ
class LciClient:
def __init__(self):
self.logger = logging.getLogger('lci_client')
self.mqtt_client = None
self.robot_id = None
self.timestamp = None
self.elevator_id = None
self.registered = False
self.timeout_sec = None
self.floor_list = []
#追記 registration
self.dry_run_registered = False
#追記 request elevator status
self.floor_value = None
self.door_value = None
self.stop_event = threading.Event()
#追記 request elevator status 2
self.mqtt_lock = threading.Lock()
self.response_queue = queue.Queue()
self.message_queue = queue.Queue()
#追記 robotstatus
self.lock_robot_status = threading.Lock()
self.stop_event_robot_status = threading.Event()
start_robot_status_open_hold
robot_status_open_hold_workerをスレッドとして実行
def start_robot_status_open_hold(self):
thread_robot_status = threading.Thread(target = self.robot_status_open_hold_worker, daemon=True)
thread_robot_status.start()
stop_robot_status_open_hold
self.stop_event_robot_statusをsetします。
def stop_robot_status_open_hold(self):
self.stop_event_robot_status.set()
robot_status_open_hold_worker
- 5秒間隔で
RobotStatus.KEEP_DOOR_OPENを送信(この間をロボットは乗り込み動作を行う) -
self.stop_event_robot_statusがsetされたらループを終了する
無限ループが生じることを防ぐために、180秒を経過した場合はループを終了します。
def robot_status_open_hold_worker(self):
time_out = 10
limit_time = self.timeout_sec + time.time()
while True:
if time.time() < limit_time:
with self.lock_robot_status:
if self.stop_event_robot_status.is_set():
break
# 乗り込み
res = self.robot_status(RobotStatus.KEEP_DOOR_OPEN, time_out)
if res is not None:
if res.result != 1:
self.registered = False
self.logger.error(f"Robot Status(state:5) is failed. code : {res.result}")
break
else:
self.logger.debug(res)
else:
self.registered = False
self.logger.error(f"Robot Status(state:5) is None")
break
self.stop_event_robot_status.wait(5)
else:
self.registered = False
self.logger.error(f"Robot Status(state:5) is time up")
break
wait_for_arrival
- エレベーターが出発階に到着したか確認
- ドアが開状態であるかを確認
- どちらも達成している場合は
Trueを返す
def wait_for_arrival(self, target_floor: str, target_door: int = 1):
#エレベーター到着確認
limit_time = self.timeout_sec + time.time()
while True:
if time.time() < limit_time:
if self.floor_value == target_floor \
and self.door_value == target_door:
self.logger.info("Elevator arrived")
self.logger.info("Door opened")
enable = True
break
else:
self.registered = False
if self.floor_value != target_floor:
self.logger.error("Elevator hasn't arrived")
if self.door_value != target_door:
self.logger.error("Door hasn't opened")
enable = False
break
time.sleep(0.05)
limit_time -= 0.05
return enable
do_robot_status_success_enter
-
robot_status関数において、ペイロード内のstateをRobotStatus.HAS_ENTERED
としてメッセージのパブリッシュ -
resultが1の場合は次のシーケンスに、それ以外は異常シーケンスに遷移
def do_robot_status_success_enter(self, time_out: int=10):
res = self.robot_status(RobotStatus.HAS_ENTERED, time_out)
if res is not None:
if res.result != 1:
self.registered = False
self.logger.error(f"Robot Status(state:1) is failed code : {res.result}")
else:
self.logger.info(f"Robot succeeded in entering elevator")
else:
self.registered = False
self.logger.error(f"Robot Status(state:1) code : None")
return res
do_robot_status_cancel_enter
robot_status_repeat_commandを実行します。
def do_robot_status_cancel_enter(self, time_out: int=10):
res = self.robot_status_repeat_command(RobotStatus.CANCEL_TO_ENTER, 5, 5, (1,2,3,99))
if res is not None:
self.logger.info(f"Robot aborted entering the elevator")
else:
self.logger.info(f"Robot Status(state:3) code : None")
self.registered = False
return res
robot_status_repeat_command
-
robot_statusを実行 -
result値が引数のsuccess_resultのいずれかの値と同じか確認 - 同じ値がある場合はループを終了、同じ値がない場合は引数の
interval経過後に再度robot_statusを送信 -
repeat_timesに達するまで繰り返す
def robot_status_repeat_command(self, state: str, repeat_times: int, interval: float, success_result: tuple[int, ...], time_out: int=10):
"""
state:1~5 (ドア開継続要求など)
repeat_times: 最大再送回数
interval:送受信間隔
success_result:ループを終了するresultの値
time_out: レスポンス待機時間
"""
send_times = 0
while send_times < repeat_times:
res = self.robot_status(state, time_out)
if res is not None:
if res.result in success_result:
return res
time.sleep(interval)
send_times += 1
robot_status
def robot_status(self, robot_status: RobotStatus, time_out: int=10):
if not self.is_registered():
return LciResponse()
requested_time = int(time.time() * 1000)/1000
payload = json.dumps({
'state': robot_status.value,
'timestamp': requested_time,
'robot_id': self.robot_id})
api = 'RobotStatus'
topic = f'{self.topic_prefix}/{self.elevator_id}/{api}/{self.robot_id}'
res = self._publish_and_wait_response(topic, payload, api, requested_time, time_out)
return res
メイン処理(if __name__=='__main__':)
- エレベーターの状態取得開始後に、
wait_for_arrival関数を実行 - 返り値
resがTrueの場合は、start_robot_status_open_hold関数を実行 - 20秒経過したら、
do_robot_status_success_enter(乗り込み完了)もしくはdo_robot_status_cancel_enter(乗り込み中止)を実行
今回は20秒かけて、ロボットがEV乗り込み動作を行うものとしました。
乗り込み中止(RobotStatus state:3)は、利用解除(Release)と同様の効果があります。
そのため、乗り込み中止送信後は利用解除を省略することができます。
(なお、利用解除を送信しても問題はありません)
# Main routine
if __name__ == '__main__':
#--省略--#
#エレベーターの状態取得
lci_client.start_request_elevator_status()
time.sleep(1)
#エレベーターの出発階到着・ドアの開閉状態確認
res = lci_client.wait_for_arrival('B1', 1)
if res:
#乗り込み
lci_client.start_robot_status_open_hold()
"""
~~~ロボットのエレベーター乗り込み動作~~~
ここでロボットによるエレベーターの乗り込み動作を実施します.
今回は20秒かけて、ロボットが乗り込み動作を行ったとみなします。
"""
roop_time = 20 + time.time()
while time.time() < roop_time:
if not lci_client.is_registered:
res = lci_client.do_release()
lci_client.logger.debug(res)
sys.exit(1)
time.sleep(1)
else:
res = lci_client.do_release()
lci_client.logger.debug(res)
sys.exit(1)
#ロボットの乗り込みが終了した場合→True
#ロボットの乗り込みを中止したい場合→False
end_boarding = False
lci_client.logger.info(f"Boarding Elevator : {end_boarding}")
with lci_client.lock_robot_status:
lci_client.stop_robot_status_open_hold()
#乗り込み完了
if end_boarding:
res = lci_client.do_robot_status_success_enter()
lci_client.logger.debug(res)
time.sleep(1)
if not res or not lci_client.is_registered:
res = lci_client.do_release()
lci_client.logger.debug(res)
sys.exit(1)
#乗り込み失敗
else:
res = lci_client.do_robot_status_cancel_enter()
lci_client.logger.debug(res)
sys.exit(1)
#行き先階への登録
res = lci_client.do_call_elevator('B1', '1F')
lci_client.logger.debug(res)
time.sleep(1)
#--省略--#
動作確認
メイン処理のコードを変更し、以下の2ケースに分けて動作を確認します。
- ① 20秒経過後に、
do_robot_status_success_enter()を実行- 乗り込み完了送信後、次のシーケンスに遷移
- ② 20秒経過後に、
do_robot_status_cancel_enter()を実行- 乗り込み中止送信後、異常シーケンスに遷移
乗り込みが完了した場合
コード変更箇所
メイン処理のend_boardingをTrueにします。
if __name__ == '__main__':
#--省略--#
#ロボットの乗り込みが終了した場合→True
#ロボットの乗り込みを中止したい場合→False
end_boarding = True
lci_client.logger.info(f"Boarding Elevator : {end_boarding}")
with lci_client.lock_robot_status:
lci_client.stop_robot_status_open_hold()
#乗り込み完了
if end_boarding:
res = lci_client.do_robot_status_success_enter()
lci_client.logger.debug(res)
time.sleep(1)
if not res or not lci_client.is_registered:
res = lci_client.do_release()
lci_client.logger.debug(res)
sys.exit(1)
#乗り込み失敗
else:
res = lci_client.do_robot_status_cancel_enter()
lci_client.logger.debug(res)
sys.exit(1)
#--省略--#
実行結果
ログから読み取れる動作として、乗り込み完了を送信した後、次のシーケンスである行き先階の登録を行っています。
- ① 13:28:40,529 ~ 13:28:56,096
- 5秒間隔で
RobotStatus : ’KEEP_DOOR_OPEN = 5’を送信
- 5秒間隔で
- ② 13:29:00,617 ~ 13:29:00,868
-
RobotStatus : ‘HAS_ENTERED = 1’を送信
-
- ③ 13:29:01,890 ~ 13:29:02,068
-
CallElevatorの実行
-
・・・
2026-04-06 13:28:40,528 INFO Elevator arrived
2026-04-06 13:28:40,528 INFO Door opened
2026-04-06 13:28:40,529 DEBUG Sending PUBLISH (d0, q1, r0, m10), 'b'/lci/simulator/1/1/RobotStatus/_uClosc2'', ... (69 bytes)
2026-04-06 13:28:40,625 DEBUG Received PUBLISH (d0, q1, r0, m1), '/lci/simulator/1/1/RobotStatusResult/_uClosc2', ... (119 bytes)
2026-04-06 13:28:45,645 DEBUG Sending PUBLISH (d0, q1, r0, m13), 'b'/lci/simulator/1/1/RobotStatus/_uClosc2'', ... (68 bytes)
2026-04-06 13:28:45,765 DEBUG Received PUBLISH (d0, q1, r0, m1), '/lci/simulator/1/1/RobotStatusResult/_uClosc2', ... (118 bytes)
2026-04-06 13:28:50,815 DEBUG Sending PUBLISH (d0, q1, r0, m16), 'b'/lci/simulator/1/1/RobotStatus/_uClosc2'', ... (69 bytes)
2026-04-06 13:28:50,944 DEBUG Received PUBLISH (d0, q1, r0, m1), '/lci/simulator/1/1/RobotStatusResult/_uClosc2', ... (119 bytes)
2026-04-06 13:28:55,974 DEBUG Sending PUBLISH (d0, q1, r0, m20), 'b'/lci/simulator/1/1/RobotStatus/_uClosc2'', ... (67 bytes)
2026-04-06 13:28:56,096 DEBUG Received PUBLISH (d0, q1, r0, m1), '/lci/simulator/1/1/RobotStatusResult/_uClosc2', ... (117 bytes)
2026-04-06 13:29:00,617 INFO Boarding Elevator : True
2026-04-06 13:29:00,617 DEBUG Sending PUBLISH (d0, q1, r0, m23), 'b'/lci/simulator/1/1/RobotStatus/_uClosc2'', ... (68 bytes)
2026-04-06 13:29:00,868 DEBUG Received PUBLISH (d0, q1, r0, m1), '/lci/simulator/1/1/RobotStatusResult/_uClosc2', ... (118 bytes)
2026-04-06 13:29:00,887 INFO Robot successed to enter elevator
2026-04-06 13:29:01,890 DEBUG Sending PUBLISH (d0, q1, r0, m24), 'b'/lci/simulator/1/1/CallElevator/_uClosc2'', ... (140 bytes)
2026-04-06 13:29:02,068 DEBUG Received PUBLISH (d0, q1, r0, m1), '/lci/simulator/1/1/CallElevatorResult/_uClosc2', ... (114 bytes)
・・・
乗り込みを中止する場合
コード変更箇所
メイン処理のend_boardingをFalseにします。
if __name__ == '__main__':
#--省略--#
#ロボットの乗り込みが終了した場合→True
#ロボットの乗り込みを中止したい場合→False
end_boarding = False
lci_client.logger.info(f"Boarding Elevator : {end_boarding}")
with lci_client.lock_robot_status:
lci_client.stop_robot_status_open_hold()
#乗り込み完了
if end_boarding:
res = lci_client.do_robot_status_success_enter()
lci_client.logger.debug(res)
time.sleep(1)
if not res or not lci_client.is_registered:
res = lci_client.do_release()
lci_client.logger.debug(res)
sys.exit(1)
#乗り込み失敗
else:
res = lci_client.do_robot_status_cancel_enter()
lci_client.logger.debug(res)
sys.exit(1)
#--省略--#
実行結果
ログから読み取れる動作として、乗り込み中止を送信した後、それ以降のシーケンスを実行していません。
- ① 13:41:12,039 ~ 13:41:27,823
- 5秒間隔で
RobotStatus : ’KEEP_DOOR_OPEN = 5’を送信
- 5秒間隔で
- ② 13:41:32,104 ~ 13:41:32,244
-
RobotStatus : ‘CANCEL_TO_ENTER = 3’を送信
-
2026-04-06 13:41:12,019 INFO Elevator arrived
2026-04-06 13:41:12,019 INFO Door opened
2026-04-06 13:41:12,039 DEBUG Sending PUBLISH (d0, q1, r0, m8), 'b'/lci/simulator/1/1/RobotStatus/_uClosc2'', ... (68 bytes)
2026-04-06 13:41:12,292 DEBUG Received PUBLISH (d0, q1, r0, m1), '/lci/simulator/1/1/RobotStatusResult/_uClosc2', ... (118 bytes)
2026-04-06 13:41:17,312 DEBUG Sending PUBLISH (d0, q1, r0, m11), 'b'/lci/simulator/1/1/RobotStatus/_uClosc2'', ... (69 bytes)
2026-04-06 13:41:17,457 DEBUG Received PUBLISH (d0, q1, r0, m1), '/lci/simulator/1/1/RobotStatusResult/_uClosc2', ... (119 bytes)
2026-04-06 13:41:22,480 DEBUG Sending PUBLISH (d0, q1, r0, m14), 'b'/lci/simulator/1/1/RobotStatus/_uClosc2'', ... (68 bytes)
2026-04-06 13:41:22,609 DEBUG Received PUBLISH (d0, q1, r0, m1), '/lci/simulator/1/1/RobotStatusResult/_uClosc2', ... (118 bytes)
2026-04-06 13:41:27,649 DEBUG Sending PUBLISH (d0, q1, r0, m18), 'b'/lci/simulator/1/1/RobotStatus/_uClosc2'', ... (68 bytes)
2026-04-06 13:41:27,823 DEBUG Received PUBLISH (d0, q1, r0, m1), '/lci/simulator/1/1/RobotStatusResult/_uClosc2', ... (118 bytes)
2026-04-06 13:41:32,103 INFO Boarding Elevator : False
2026-04-06 13:41:32,104 DEBUG Sending PUBLISH (d0, q1, r0, m21), 'b'/lci/simulator/1/1/RobotStatus/_uClosc2'', ... (68 bytes)
2026-04-06 13:41:32,244 DEBUG Received PUBLISH (d0, q1, r0, m1), '/lci/simulator/1/1/RobotStatusResult/_uClosc2', ... (118 bytes)
2026-04-06 13:41:32,270 INFO Robot canncel to enter elevator
その他
- 本記事は、LCIプロトコル仕様書(オンライン)のv2025-10-04を参考に作成・加筆しています。
- LCI関連の開発に必要な情報が記載されている開発者用ポータルは以下からアクセスできます。
- 各種資料は以下から閲覧・ダウンロードできます。
- 開発用アカウントの発行は、以下からお問い合わせください。
連載記事リスト
- (1)はじめに・自己紹介・システム概要
- (2)開発用アカウント発行・サンプルコード
- (3)MQTTのブローカー接続・MQTTについて
- (4)エレベーター連携の全体シーケンス
- (5)エレベーター利用登録(Registration)
- (6)エレベーター呼出階・行き先階指定(CallElevator)
- (7)エレベーターの状態取得(RequestElevatorStatus)
- (8)エレベーターの状態取得2(RequestElevatorStatus)
- (9)乗り込み・乗り込み完了(RobotStatus)
他の記事もチェックしてみてください!
次回の記事では、通信プロトコルの「降車・降車完了」についてサンプルコードを改良する予定です!
ご興味があれば「いいね」やフォローをいただけると励みになります!






