1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Flutter講座(forROS2)02 WebAPIを受けるROSノードの作成

Posted at

環境

この記事は以下の環境で動いています。

項目
CPU Core i5-8250U
Ubuntu 22.04
ROS2 Humble

概要

前回はflutterのサンプルアプリを起動しましたが、これはアプリ単体で動作するものです。ロボットを操作するにはflutterアプリとROSノードを接続する必要があります。
今回はWebAPIとWebsocketでアクセスできるROSノードを作成します。次回以降それにflutterアプリでアクセスします。

WebAPI

WebAPIはHTTPでサーバーのリソースにアクセスするためのインターフェースです。WebAPIの通信では必ずクライアント・サーバーの2者がいて、クライアント(今回はflutterアプリ)がサーバー(今回はROSノード)に「リクエスト」を送り「レスポンス」が返ってくるというという形をとります。ここでは一般的な以下のようなWebAPIを扱います。

  • httpメソッドを用いる
  • URIでリソースを指定する
  • パラメーターをクエリーであらわす

websocket

WebSocketとはクライアントとサーバーの間で対話的な通信を行うことが出来るプロトコルです。
WebSocketは必ずクライアント側から「リクエスト」を送る必要がありますが、Websocketではどちらからでも「メッセージ」を送れます。
今回はこれをサーバー(ROSノード)からクライアント(flutter)へのテレメトリー情報の送信に使用します。

FastAPI

FastAPIはpythonのモジュールで、少ない記述でWebAPIやWebsocketのサーバーとしてふるまうことが出来ます。

ソースコード

flutterからのWebAPIのリクエストを受けたり、websockertのメッセージを送信するノードです。起動の管理からROS2ノードとして動くようにしています。
github上ではこちらapi_server.py

このノードは以下の4つの機能についてWebAPI(&websocket)-ROS間のブリッジをします。

機能名 WebAPI側IF(Websocket) WebAPI側IF(POST) ROS側IF 説明
drawer_select status: 選択子を提示 select/drawer: 選択を指示 action select/drawer 右DrawerのUIで主にモード選択
action_select status: 選択子を提示 select/action: 選択を指示 action select/action FloatingActionのUIで指示を送る
display status: 画面表示情報を送る -- ros parameter 画面表示用の情報を送信 
note status: note情報を送る -- service set_note 一定時間で消えるメッセージの表示
api_lecture/scripts/api_server.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import time
import threading
import json
import sys

# FastAPI
import fastapi
import uvicorn
import pydantic
from starlette.middleware.cors import CORSMiddleware
import asyncio

# ROS2
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
from rclpy.action import ActionServer, CancelResponse
from srs_simple_tablet_server_msgs.action  import SelectTask
from srs_simple_tablet_server_msgs.srv import SetNote

####### Data Type #######
class SelectPostData(pydantic.BaseModel):
    goal_id: str
    key: str

app = fastapi.FastAPI(debug=True)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class Ros2ApiServer(Node):
    def __init__(self):
        super().__init__('api_server')

        # drawer_select
        self.drawer_select_action_server = ActionServer(self, SelectTask, '~/select/drawer', self.drawer_select_callback, cancel_callback=self.cancel_accept_callback)
        self.drawer_select_goal_handle_dic = {}
        self.drawer_select_post_data_dic = {}

        # action_select
        self.action_select_action_server = ActionServer(self, SelectTask, '~/select/action', self.action_select_callback, cancel_callback=self.cancel_accept_callback)
        self.action_select_goal_handle_dic = {}
        self.action_select_post_data_dic = {}

        # display
        self.declare_parameter('display.header.name', 'manual')
        self.declare_parameter('display.header.color', 'gray')
        self.declare_parameter('display.board.icon', 'pause')
        self.declare_parameter('display.board.message', 'none')

        # note
        self.note_srv = self.create_service(SetNote, "~/set_note", self.set_note_callback)
        self.note_data_dic = {}
        self.interval_timer = self.create_timer(0.5, self.timer_callback)

        @app.post("/select/drawer")
        async def post_drawer_select(post_data: SelectPostData):
            self.get_logger().info("post_drawer_select: " + str(post_data))
            self.drawer_select_post_data_dic[post_data.goal_id] = post_data
            return fastapi.responses.JSONResponse(status_code=fastapi.status.HTTP_200_OK)

        @app.post("/select/action")
        async def post_action_select(post_data: SelectPostData):
            self.get_logger().info("post_action_select: " + str(post_data))
            self.action_select_post_data_dic[post_data.goal_id] = post_data
            return fastapi.responses.JSONResponse(status_code=fastapi.status.HTTP_200_OK)

        @app.websocket("/status")
        async def websocket_endpoint(websocket: fastapi.WebSocket):
            try:
                await websocket.accept()
                self.get_logger().info("connetct: " + websocket.headers.get("sec-websocket-key"))
                while True:
                    await websocket.send_text(self.generate_status())
                    await asyncio.sleep(0.5)
            except Exception as e:
                self.get_logger().info("disconnected: " + str(e))

    def drawer_select_callback(self, goal_handle):
        goal_id_str = str(bytes(goal_handle.goal_id.uuid).hex())
        self.get_logger().info(f'drawer_select_callback: {goal_id_str}')
        self.drawer_select_goal_handle_dic[goal_id_str] = goal_handle.request

        result = SelectTask.Result()
        while goal_handle.is_active:
            if goal_handle.is_cancel_requested:
                # canceled
                self.get_logger().info('drawer_select: cancel request')
                goal_handle.canceled()
                break

            elif goal_id_str in self.drawer_select_post_data_dic.keys():     
                # receive post data
                self.get_logger().info('drawer_select: get post data')
                post_data = self.drawer_select_post_data_dic[goal_id_str]
                goal_handle.succeed()

                result.selected_key = post_data.key
                del self.drawer_select_post_data_dic[goal_id_str]
                break

            feedback_msg = SelectTask.Feedback()
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(0.5)
        else:
            self.get_logger().info('drawer_select: error')
            goal_handle.abort()

        del self.drawer_select_goal_handle_dic[goal_id_str]
        return result

    def action_select_callback(self, goal_handle):
        goal_id_str = str(bytes(goal_handle.goal_id.uuid).hex())
        self.get_logger().info(f'action_select_callback: {goal_id_str}')
        self.action_select_goal_handle_dic[goal_id_str] = goal_handle.request

        result = SelectTask.Result()
        while goal_handle.is_active:
            if goal_handle.is_cancel_requested:
                # canceled
                self.get_logger().info('action_select: cancel request')
                goal_handle.canceled()
                break

            elif goal_id_str in self.action_select_post_data_dic.keys():     
                # receive post data
                self.get_logger().info('action_select: get post data')
                post_data = self.action_select_post_data_dic[goal_id_str]
                goal_handle.succeed()

                result.selected_key = post_data.key
                del self.action_select_post_data_dic[goal_id_str]
                break

            feedback_msg = SelectTask.Feedback()
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(0.5)
        else:
            self.get_logger().info('action_select: error')
            goal_handle.abort()

        del self.action_select_goal_handle_dic[goal_id_str]
        return result

    def cancel_accept_callback(self, goal_handle):
        goal_id_str = str(bytes(goal_handle.goal_id.uuid).hex())
        self.get_logger().info(f'cancel_accept_callback: {goal_id_str}')
        return CancelResponse.ACCEPT

    def set_note_callback(self, request, response):
        ros_now = self.get_clock().now()
        self.note_data_dic[request.key] = [ros_now, request]
        response.success = True
        return response

    def timer_callback(self):
        ros_now = self.get_clock().now()
        delete_duration = 5.0
        delet_keys = [key for key, value in self.note_data_dic.items() if delete_duration * (10 ** 9) < (ros_now - value[0]).nanoseconds]
        for key in delet_keys:
            del self.note_data_dic[key]

    def generate_status(self):
        def generate_options(goal_handle_dic):
            output_list = []
            for goal_id, request in goal_handle_dic.items():
                for option in request.option_list:
                    output_list.append({"goal_id":goal_id, "key":option.key, "display_name":option.display_name})
            return output_list

        # drawer_select
        drawer_select_data = {}
        drawer_select_data["options"] = generate_options(self.drawer_select_goal_handle_dic)

        # action_select
        action_select_data = {}
        action_select_data["options"] = generate_options(self.action_select_goal_handle_dic)

        # display
        display_data = {}
        display_data["header"] = {}
        display_data["header"]["name"] = self.get_parameter('display.header.name').get_parameter_value().string_value
        display_data["header"]["color"] = self.get_parameter('display.header.color').get_parameter_value().string_value
        display_data["board"] = {}
        display_data["board"]["icon"] = self.get_parameter('display.board.icon').get_parameter_value().string_value
        display_data["board"]["message"] = self.get_parameter('display.board.message').get_parameter_value().string_value

        # note
        note_data = {}
        note_data["contents"] = []
        for item in self.note_data_dic.values():
            level = "none"
            if item[1].level == SetNote.Request.LEVEL_INFO:
                level = "info"
            elif item[1].level == SetNote.Request.LEVEL_WARNING:
                level_str = "warning"
            elif item[1].level == SetNote.Request.LEVEL_ERROR:
                level_str = "error"
            note_data["contents"].append({"level":level, "message":item[1].message})

        # output
        send_data = {}
        send_data["drawer_select"] = drawer_select_data
        send_data["action_select"] = action_select_data
        send_data["display"] = display_data
        send_data["note"] = note_data
        return json.dumps(send_data)

def main(argv=sys.argv):
    rclpy.init()
    node = Ros2ApiServer()

    executor = MultiThreadedExecutor()
    ros2_thread = threading.Thread(target=rclpy.spin, args=(node,), kwargs={"executor":executor}, daemon=True)
    ros2_thread.start()
    uvicorn.run(app, host="", port=8010, log_level="warning")

    node.destroy_node()
    rclpy.shutdown()

if __name__ == "__main__":
    main(sys.argv)

実行

statusの受信

websocketで2Hzで流れてくるステータス情報を取得します。json形式の中身が見えます。

status情報の取得
pip3 install websockets
python3 -m websockets ws://localhost:8010/status
受信の例
{"drawer_select": {"options": []}, "action_select": {"options": []}, "display": {"header": {"name": "manual", "color": "gray"}, "board": {"icon": "pause", "message": "none"}}, "note": {"contents": []}}

drawer_select

まずros2_actionで選択し情報をsend_goalします。

goalの送信
ros2 action send_goal /api_server/select/drawer srs_simple_tablet_server_msgs/action/SelectTask '{"option_list":[{"key":"aaa"},{"key":"bbb"}]}'

こうするとstatusに選択し情報が増えます。

statusの受信値
{"drawer_select": {"options": [{"goal_id": "3d9e3fd0e5484fb98fda6499197686b9", "key": "aaa", "display_name": ""}, {"goal_id": "dbd5dd41683048ca900dc9657d5ec1fd", "key": "bbb", "display_name": ""}]}, "action_select": {"options": []}, "display": {"header": {"name": "manual", "color": "gray"}, "board": {"icon": "pause", "message": "none"}}, "note": {"contents": []}}

選択結果をpostします。

選択結果のpost
curl -X POST http://localhost:8010/select/drawer --data '{"key":"aaa", "goal_id":"3d9e3fd0e5484fb98fda6499197686b9"}' 

send_goalしていた画面に結果が返ります。

r$ ros2 action send_goal /api_server/select/drawer srs_simple_tablet_server_msgs/action/SelectTask '{"option_list":[{"key":"aaa"},{"key":"bbb"}]}'
Waiting for an action server to become available...
Sending goal:
     option_list:
- key: aaa
  display_name: ''
- key: bbb
  display_name: ''

Goal accepted with ID: 3d9e3fd0e5484fb98fda6499197686b9

Result:
    selected_key: aaa

Goal finished with status: SUCCEEDED

目次ページへのリンク

ROS2講座の目次へのリンク

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?