1
0

TURTLEBOT3 Waffle Pi 姿勢操作

Posted at

はじめに

カメラで姿勢を推定を行い,TURTLEBOT3を操作するインターフェースをROS 2上で作成しました.

環境

Raspberry Pi

デバイス Raspberry Pi 4B
メモリ 4GB

ロボット用PC

デバイス dynabook Satellite R35/M
プロセッサ intel CORE i5-4210U
メモリ 16GB

姿勢推定用PC

デバイス MacBook Pro
プロセッサ M1 Pro
メモリ 16GB

ロボット

ロボット TURTLEBOT 3 WAFFLE PI

ROS

Distribution foxy
プログラミング言語 python 3.x

システム構成

teleopの仕組みをもとにして作成しています.publisherがキー入力だったのを,姿勢推定で得られるキーポイントを評価したのものに変えただけです.

Screenshot 2024-04-15 at 2.11.23.png

プログラム

Raspberry Pi

turtlebot3_bringupパッケージからrobot.launch.pyという起動スクリプト(launch file)をそのまま利用する.

ロボット用PC

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import sys, select, os
if os.name == 'nt':
    import msvcrt
else:
    import tty, termios

import socket


BURGER_MAX_LIN_VEL = 0.22
BURGER_MAX_ANG_VEL = 2.84

WAFFLE_MAX_LIN_VEL = 0.26
WAFFLE_MAX_ANG_VEL = 1.82

LIN_VEL_STEP_SIZE = 0.01
ANG_VEL_STEP_SIZE = 0.1

# サーバー側の設定
SERVER_HOST = '0.0.0.0'  # すべてのアドレスからの接続を受け付ける
SERVER_PORT = 50001      # ポート番号

msg = """
Control Your TurtleBot3!
---------------------------
Moving around:
        w
   a    s    d
        x

w/x : increase/decrease linear velocity (Burger : ~ 0.22, Waffle and Waffle Pi : ~ 0.26)
a/d : increase/decrease angular velocity (Burger : ~ 2.84, Waffle and Waffle Pi : ~ 1.82)

space key, s : force stop

CTRL-C to quit
"""

e = """
Communications Failed
"""

def get_remote_key():
    global server_socket
    server_socket.listen(1)
    print("Waiting for connection on {}:{}".format(SERVER_HOST, SERVER_PORT))
    client_socket, addr = server_socket.accept()
    print("Connection from: ", addr)
    with client_socket:
        key = client_socket.recv(1024).decode('utf-8')
        return key

# サーバーソケットをグローバルスコープで1回だけ作成
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((SERVER_HOST, SERVER_PORT))


def vels(target_linear_vel, target_angular_vel):
    return "currently:\tlinear vel %s\t angular vel %s " % (target_linear_vel,target_angular_vel)

def makeSimpleProfile(output, input, slop):
    if input > output:
        output = min( input, output + slop )
    elif input < output:
        output = max( input, output - slop )
    else:
        output = input

    return output

def constrain(input, low, high):
    if input < low:
      input = low
    elif input > high:
      input = high
    else:
      input = input

    return input

def checkLinearLimitVelocity(vel):
    if turtlebot3_model == "burger":
      vel = constrain(vel, -BURGER_MAX_LIN_VEL, BURGER_MAX_LIN_VEL)
    elif turtlebot3_model == "waffle" or turtlebot3_model == "waffle_pi":
      vel = constrain(vel, -WAFFLE_MAX_LIN_VEL, WAFFLE_MAX_LIN_VEL)
    else:
      vel = constrain(vel, -BURGER_MAX_LIN_VEL, BURGER_MAX_LIN_VEL)

    return vel

def checkAngularLimitVelocity(vel):
    if turtlebot3_model == "burger":
      vel = constrain(vel, -BURGER_MAX_ANG_VEL, BURGER_MAX_ANG_VEL)
    elif turtlebot3_model == "waffle" or turtlebot3_model == "waffle_pi":
      vel = constrain(vel, -WAFFLE_MAX_ANG_VEL, WAFFLE_MAX_ANG_VEL)
    else:
      vel = constrain(vel, -BURGER_MAX_ANG_VEL, BURGER_MAX_ANG_VEL)

    return vel

def main(args=None):
    rclpy.init(args=args)
    node = Node('pose_teleop_bot')
    pub = node.create_publisher(Twist, 'cmd_vel', 10)
    turtlebot3_model = node.declare_parameter('model', 'waffle').value

    status = 0
    target_linear_vel = 0.0
    target_angular_vel = 0.0
    control_linear_vel = 0.0
    control_angular_vel = 0.0

    try:
        print(msg)
        while rclpy.ok():
            rclpy.spin_once(node, timeout_sec=0.1)

            key = get_remote_key()

            if key == 'w' :
                status = status + 1
                target_linear_vel   = 0.26/2 # 線形速度の目標値を設定
                target_angular_vel  = 0.0
                print(vels(target_linear_vel,target_angular_vel))
            elif key == 'x' :
                status = status + 1
                target_linear_vel   = -0.26/2 # 線形速度の目標値を設定
                target_angular_vel  = 0.0
                print(vels(target_linear_vel,target_angular_vel))
            elif key == 'a' :
                status = status + 1
                target_linear_vel   = 0.0
                target_angular_vel  = -1.82/2 # 角速度の目標値を設定
                print(vels(target_linear_vel,target_angular_vel))
            elif key == 'd' :
                status = status + 1
                target_linear_vel   = 0.0
                target_angular_vel  = 1.82/2 # 角速度の目標値を設定
                print(vels(target_linear_vel,target_angular_vel))
            elif key == ' ' or key == 's' :
                target_linear_vel   = 0.0
                control_linear_vel  = 0.0
                target_angular_vel  = 0.0
                control_angular_vel = 0.0
                print(vels(target_linear_vel, target_angular_vel))
            else:
                if (key == '\x03'):
                    break

            if status == 20 :
                print(msg)
                status = 0

            twist = Twist()

            control_linear_vel = makeSimpleProfile(control_linear_vel, target_linear_vel, (LIN_VEL_STEP_SIZE/2.0))
            twist.linear.x = control_linear_vel; twist.linear.y = 0.0; twist.linear.z = 0.0

            control_angular_vel = makeSimpleProfile(control_angular_vel, target_angular_vel, (ANG_VEL_STEP_SIZE/2.0))
            twist.angular.x = 0.0; twist.angular.y = 0.0; twist.angular.z = control_angular_vel

            pub.publish(twist)

    except Exception as e:
        node.get_logger().error('例外が発生しました: ' + str(e))

    finally:
        twist = Twist()
        twist.linear.x = 0.0; twist.linear.y = 0.0; twist.linear.z = 0.0
        twist.angular.x = 0.0; twist.angular.y = 0.0; twist.angular.z = 0.0
        pub.publish(twist)

        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    if os.name != 'nt':
        settings = termios.tcgetattr(sys.stdin)

    main()

    if os.name != 'nt':
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)

姿勢推定用PC

pose_teleop.py
from ultralytics import YOLO
import torch
import cv2
import numpy as np
import socket
import time

# ポーズ
model = YOLO('yolov8n-pose.pt') # ポーズの学習済みモデルは用意する

# 適宜、手と肩のキーポイントのインデックスを設定
right_wrist_index = 10
left_wrist_index = 9
right_shoulder_index = 6
left_shoulder_index = 5

# キーポイントの信頼度スコアが高いかどうかを確認する閾値
confidence_threshold = 0.1

# クライアント側の設定
client_socket = None
SERVER_HOST = '10.42.1.1'  # サーバーのIPアドレスまたはホスト名(これは自分で用意する)
SERVER_PORT = 50001                  # サーバー側で設定したポート番号

# OpenCVを使用してWebカメラを開く
cap = cv2.VideoCapture(0)

def initialize_socket():
    global client_socket
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_socket.connect((SERVER_HOST, SERVER_PORT))

def send_key(key):
    global client_socket
    try:
        if client_socket is None:
            initialize_socket()
        client_socket.sendall(key.encode())
    except ConnectionRefusedError as e:
        print(f"接続が拒否されました: {e}")
        client_socket = None  # 次回試行時に再接続するために None に設定
    except socket.error as e:
        print(f"ソケットエラーが発生しました: {e}")
        client_socket.close()
        client_socket = None  # ソケットを閉じて再接続する準備をする
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {e}")
        client_socket.close()
        client_socket = None  # ソケットを閉じて再接続する準備をする

# メインループ開始前にソケットを初期化
initialize_socket()

while True:
    # カメラからフレームを読み込む
    ret, frame = cap.read()

    if ret:
        frame = cv2.flip(frame, 1)

    if not ret:
        break

    # 姿勢検出
    results = model(frame)

    # Visualize the results on the frame
    annotated_frame = results[0].plot()

    # 姿勢分析結果のキーポイントを取得する
    keypoints = results[0].keypoints

    # 姿勢分析結果のキーポイントを取得する
    keypoints_data = keypoints.data[0]  # 1人の個体に関するキーポイントのデータを取得

    # 各キーポイントの座標と信頼度を出力
    for i, (x, y, conf) in enumerate(keypoints_data):
        # 信頼度が一定の閾値より高いキーポイントのみを表示
        if conf > 0.6:  # 信頼度の閾値は調整可能です
            print(f"キーポイント {i}: x座標={x}, y座標={y}, 信頼度={conf}")

    # 右手と左手のキーポイントが有効かどうかを確認し、条件に応じてテキストを更新
    right_hand_text = "RIGHT HAND: .."
    left_hand_text = "LEFT HAND : .."
    right_state = 0
    left_state = 0
    right_crossed = False
    left_crossed = False

    # 右手のキーポイントが検出されているか確認
    if keypoints_data.shape[0] > right_wrist_index and keypoints_data[right_wrist_index][2] > confidence_threshold:
        # 右手が挙げられているかどうか
        if keypoints_data[right_wrist_index][1] < keypoints_data[right_shoulder_index][1]:
            right_hand_text = "LEFT HAND : ^"
            print("RIGHT HAND: RAISED")
            right_state = 1
        else:
            right_hand_text = "LEFT HAND : .."
            print("RIGHT HAND: NOT RAISED")

    # 左手のキーポイントが検出されているか確認
    if keypoints_data.shape[0] > left_wrist_index and keypoints_data[left_wrist_index][2] > confidence_threshold:
        # 左手が挙げられているかどうか
        if keypoints_data[left_wrist_index][1] < keypoints_data[left_shoulder_index][1]:
            left_hand_text = "RIGHT HAND : ^"
            print("LEFT HAND : RAISED")
            left_state = 1
        else:
            left_hand_text = "RIGHT HAND : .."
            print("LEFT HAND : NOT RAISED")

    # 右手がクロスしているかどうか
    if keypoints_data.shape[0] > right_wrist_index and keypoints_data[right_wrist_index][2] > confidence_threshold:
        if keypoints_data[right_wrist_index][0] < keypoints_data[left_shoulder_index][0]:
            print("RIGHT HAND: CROSSED")
            right_crossed = False
        else:
            right_crossed = True

    # 左手がクロスしているかどうか
    if keypoints_data.shape[0] > left_wrist_index and keypoints_data[left_wrist_index][2] > confidence_threshold:
        if keypoints_data[left_wrist_index][0] > keypoints_data[right_shoulder_index][0]:
            print("LEFT HAND: CROSSED")
            left_crossed = False
        else:
            left_crossed = True

    # テキストのフォントとスケールを設定
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 3.0
    color = (0, 255, 0)  # 白色
    thickness = 7  # 線の太さ

    if right_state == 1 and left_state == 1:
        send_key('w')
        cv2.putText(annotated_frame, "GO!!", (900, 220), font, 9.0, color, 30)
    elif right_state == 1 and left_state == 0:
        send_key('d')
        cv2.putText(annotated_frame, "<---", (900, 220), font, 9.0, color, 30)
    elif right_state == 0 and left_state == 1:
        send_key('a')
        cv2.putText(annotated_frame, "--->", (900, 220), font, 9.0, color, 30)
    elif right_crossed or left_crossed:
        send_key('x')
        cv2.putText(annotated_frame, "Back", (900, 220), font, 9.0, color, 30)
    else:
        send_key('s')
        cv2.putText(annotated_frame, "STOP", (900, 220), font, 9.0, color, 30)



    # 右手のテキストを描画
    cv2.putText(annotated_frame, right_hand_text, (10, 90), font, font_scale, color, thickness)

    # 左手のテキストを描画
    cv2.putText(annotated_frame, left_hand_text, (10, 180), font, font_scale, color, thickness)

    # cv2でフレームを表示
    cv2.imshow("frame", annotated_frame)

    # 'q'キーが押されたらループを抜ける
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# プログラムの終了時にソケットを閉じる
if client_socket is not None:
    client_socket.close()

# 終了処理
cap.release()
cv2.destroyAllWindows()

実行

Raspberry Pi

export TURTLEBOT3_MODEL=waffle_pi
ros2 launch turtlebot3_bringup robot.launch.py

ロボット用PC

export TURTLEBOT3_MODEL=waffle
colcon build
source install/setup.bash
ros2 run package_name node_name

姿勢推定用PC

python pose_teleop.py

おわりに

身体を使ってロボットを操作する体験は幼い子どもたちにも釘付けでした.感覚的には警備員がオーライしているようなのと近い.

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0