はじめに
カメラで姿勢を推定を行い,TURTLEBOT3を操作するインターフェースをROS 2上で作成しました.
環境
Raspberry Pi
デバイス | Raspberry Pi 4B |
メモリ | 4GB |
ロボット用PC
デバイス | dynabook Satellite R35/M |
プロセッサ | intel CORE i5-4210U |
メモリ | 16GB |
姿勢推定用PC
デバイス | MacBook Pro |
プロセッサ | M1 Pro |
メモリ | 16GB |
ロボット
ロボット | TURTLEBOT 3 WAFFLE PI |
ROS
Distribution | foxy |
プログラミング言語 | python 3.x |
システム構成
teleopの仕組みをもとにして作成しています.publisherがキー入力だったのを,姿勢推定で得られるキーポイントを評価したのものに変えただけです.
プログラム
Raspberry Pi
turtlebot3_bringupパッケージからrobot.launch.pyという起動スクリプト(launch file)をそのまま利用する.
ロボット用PC
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import sys, select, os
if os.name == 'nt':
import msvcrt
else:
import tty, termios
import socket
BURGER_MAX_LIN_VEL = 0.22
BURGER_MAX_ANG_VEL = 2.84
WAFFLE_MAX_LIN_VEL = 0.26
WAFFLE_MAX_ANG_VEL = 1.82
LIN_VEL_STEP_SIZE = 0.01
ANG_VEL_STEP_SIZE = 0.1
# サーバー側の設定
SERVER_HOST = '0.0.0.0' # すべてのアドレスからの接続を受け付ける
SERVER_PORT = 50001 # ポート番号
msg = """
Control Your TurtleBot3!
---------------------------
Moving around:
w
a s d
x
w/x : increase/decrease linear velocity (Burger : ~ 0.22, Waffle and Waffle Pi : ~ 0.26)
a/d : increase/decrease angular velocity (Burger : ~ 2.84, Waffle and Waffle Pi : ~ 1.82)
space key, s : force stop
CTRL-C to quit
"""
e = """
Communications Failed
"""
def get_remote_key():
global server_socket
server_socket.listen(1)
print("Waiting for connection on {}:{}".format(SERVER_HOST, SERVER_PORT))
client_socket, addr = server_socket.accept()
print("Connection from: ", addr)
with client_socket:
key = client_socket.recv(1024).decode('utf-8')
return key
# サーバーソケットをグローバルスコープで1回だけ作成
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((SERVER_HOST, SERVER_PORT))
def vels(target_linear_vel, target_angular_vel):
return "currently:\tlinear vel %s\t angular vel %s " % (target_linear_vel,target_angular_vel)
def makeSimpleProfile(output, input, slop):
if input > output:
output = min( input, output + slop )
elif input < output:
output = max( input, output - slop )
else:
output = input
return output
def constrain(input, low, high):
if input < low:
input = low
elif input > high:
input = high
else:
input = input
return input
def checkLinearLimitVelocity(vel):
if turtlebot3_model == "burger":
vel = constrain(vel, -BURGER_MAX_LIN_VEL, BURGER_MAX_LIN_VEL)
elif turtlebot3_model == "waffle" or turtlebot3_model == "waffle_pi":
vel = constrain(vel, -WAFFLE_MAX_LIN_VEL, WAFFLE_MAX_LIN_VEL)
else:
vel = constrain(vel, -BURGER_MAX_LIN_VEL, BURGER_MAX_LIN_VEL)
return vel
def checkAngularLimitVelocity(vel):
if turtlebot3_model == "burger":
vel = constrain(vel, -BURGER_MAX_ANG_VEL, BURGER_MAX_ANG_VEL)
elif turtlebot3_model == "waffle" or turtlebot3_model == "waffle_pi":
vel = constrain(vel, -WAFFLE_MAX_ANG_VEL, WAFFLE_MAX_ANG_VEL)
else:
vel = constrain(vel, -BURGER_MAX_ANG_VEL, BURGER_MAX_ANG_VEL)
return vel
def main(args=None):
rclpy.init(args=args)
node = Node('pose_teleop_bot')
pub = node.create_publisher(Twist, 'cmd_vel', 10)
turtlebot3_model = node.declare_parameter('model', 'waffle').value
status = 0
target_linear_vel = 0.0
target_angular_vel = 0.0
control_linear_vel = 0.0
control_angular_vel = 0.0
try:
print(msg)
while rclpy.ok():
rclpy.spin_once(node, timeout_sec=0.1)
key = get_remote_key()
if key == 'w' :
status = status + 1
target_linear_vel = 0.26/2 # 線形速度の目標値を設定
target_angular_vel = 0.0
print(vels(target_linear_vel,target_angular_vel))
elif key == 'x' :
status = status + 1
target_linear_vel = -0.26/2 # 線形速度の目標値を設定
target_angular_vel = 0.0
print(vels(target_linear_vel,target_angular_vel))
elif key == 'a' :
status = status + 1
target_linear_vel = 0.0
target_angular_vel = -1.82/2 # 角速度の目標値を設定
print(vels(target_linear_vel,target_angular_vel))
elif key == 'd' :
status = status + 1
target_linear_vel = 0.0
target_angular_vel = 1.82/2 # 角速度の目標値を設定
print(vels(target_linear_vel,target_angular_vel))
elif key == ' ' or key == 's' :
target_linear_vel = 0.0
control_linear_vel = 0.0
target_angular_vel = 0.0
control_angular_vel = 0.0
print(vels(target_linear_vel, target_angular_vel))
else:
if (key == '\x03'):
break
if status == 20 :
print(msg)
status = 0
twist = Twist()
control_linear_vel = makeSimpleProfile(control_linear_vel, target_linear_vel, (LIN_VEL_STEP_SIZE/2.0))
twist.linear.x = control_linear_vel; twist.linear.y = 0.0; twist.linear.z = 0.0
control_angular_vel = makeSimpleProfile(control_angular_vel, target_angular_vel, (ANG_VEL_STEP_SIZE/2.0))
twist.angular.x = 0.0; twist.angular.y = 0.0; twist.angular.z = control_angular_vel
pub.publish(twist)
except Exception as e:
node.get_logger().error('例外が発生しました: ' + str(e))
finally:
twist = Twist()
twist.linear.x = 0.0; twist.linear.y = 0.0; twist.linear.z = 0.0
twist.angular.x = 0.0; twist.angular.y = 0.0; twist.angular.z = 0.0
pub.publish(twist)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
if os.name != 'nt':
settings = termios.tcgetattr(sys.stdin)
main()
if os.name != 'nt':
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
姿勢推定用PC
pose_teleop.py
from ultralytics import YOLO
import torch
import cv2
import numpy as np
import socket
import time
# ポーズ
model = YOLO('yolov8n-pose.pt') # ポーズの学習済みモデルは用意する
# 適宜、手と肩のキーポイントのインデックスを設定
right_wrist_index = 10
left_wrist_index = 9
right_shoulder_index = 6
left_shoulder_index = 5
# キーポイントの信頼度スコアが高いかどうかを確認する閾値
confidence_threshold = 0.1
# クライアント側の設定
client_socket = None
SERVER_HOST = '10.42.1.1' # サーバーのIPアドレスまたはホスト名(これは自分で用意する)
SERVER_PORT = 50001 # サーバー側で設定したポート番号
# OpenCVを使用してWebカメラを開く
cap = cv2.VideoCapture(0)
def initialize_socket():
global client_socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((SERVER_HOST, SERVER_PORT))
def send_key(key):
global client_socket
try:
if client_socket is None:
initialize_socket()
client_socket.sendall(key.encode())
except ConnectionRefusedError as e:
print(f"接続が拒否されました: {e}")
client_socket = None # 次回試行時に再接続するために None に設定
except socket.error as e:
print(f"ソケットエラーが発生しました: {e}")
client_socket.close()
client_socket = None # ソケットを閉じて再接続する準備をする
except Exception as e:
print(f"予期せぬエラーが発生しました: {e}")
client_socket.close()
client_socket = None # ソケットを閉じて再接続する準備をする
# メインループ開始前にソケットを初期化
initialize_socket()
while True:
# カメラからフレームを読み込む
ret, frame = cap.read()
if ret:
frame = cv2.flip(frame, 1)
if not ret:
break
# 姿勢検出
results = model(frame)
# Visualize the results on the frame
annotated_frame = results[0].plot()
# 姿勢分析結果のキーポイントを取得する
keypoints = results[0].keypoints
# 姿勢分析結果のキーポイントを取得する
keypoints_data = keypoints.data[0] # 1人の個体に関するキーポイントのデータを取得
# 各キーポイントの座標と信頼度を出力
for i, (x, y, conf) in enumerate(keypoints_data):
# 信頼度が一定の閾値より高いキーポイントのみを表示
if conf > 0.6: # 信頼度の閾値は調整可能です
print(f"キーポイント {i}: x座標={x}, y座標={y}, 信頼度={conf}")
# 右手と左手のキーポイントが有効かどうかを確認し、条件に応じてテキストを更新
right_hand_text = "RIGHT HAND: .."
left_hand_text = "LEFT HAND : .."
right_state = 0
left_state = 0
right_crossed = False
left_crossed = False
# 右手のキーポイントが検出されているか確認
if keypoints_data.shape[0] > right_wrist_index and keypoints_data[right_wrist_index][2] > confidence_threshold:
# 右手が挙げられているかどうか
if keypoints_data[right_wrist_index][1] < keypoints_data[right_shoulder_index][1]:
right_hand_text = "LEFT HAND : ^"
print("RIGHT HAND: RAISED")
right_state = 1
else:
right_hand_text = "LEFT HAND : .."
print("RIGHT HAND: NOT RAISED")
# 左手のキーポイントが検出されているか確認
if keypoints_data.shape[0] > left_wrist_index and keypoints_data[left_wrist_index][2] > confidence_threshold:
# 左手が挙げられているかどうか
if keypoints_data[left_wrist_index][1] < keypoints_data[left_shoulder_index][1]:
left_hand_text = "RIGHT HAND : ^"
print("LEFT HAND : RAISED")
left_state = 1
else:
left_hand_text = "RIGHT HAND : .."
print("LEFT HAND : NOT RAISED")
# 右手がクロスしているかどうか
if keypoints_data.shape[0] > right_wrist_index and keypoints_data[right_wrist_index][2] > confidence_threshold:
if keypoints_data[right_wrist_index][0] < keypoints_data[left_shoulder_index][0]:
print("RIGHT HAND: CROSSED")
right_crossed = False
else:
right_crossed = True
# 左手がクロスしているかどうか
if keypoints_data.shape[0] > left_wrist_index and keypoints_data[left_wrist_index][2] > confidence_threshold:
if keypoints_data[left_wrist_index][0] > keypoints_data[right_shoulder_index][0]:
print("LEFT HAND: CROSSED")
left_crossed = False
else:
left_crossed = True
# テキストのフォントとスケールを設定
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 3.0
color = (0, 255, 0) # 白色
thickness = 7 # 線の太さ
if right_state == 1 and left_state == 1:
send_key('w')
cv2.putText(annotated_frame, "GO!!", (900, 220), font, 9.0, color, 30)
elif right_state == 1 and left_state == 0:
send_key('d')
cv2.putText(annotated_frame, "<---", (900, 220), font, 9.0, color, 30)
elif right_state == 0 and left_state == 1:
send_key('a')
cv2.putText(annotated_frame, "--->", (900, 220), font, 9.0, color, 30)
elif right_crossed or left_crossed:
send_key('x')
cv2.putText(annotated_frame, "Back", (900, 220), font, 9.0, color, 30)
else:
send_key('s')
cv2.putText(annotated_frame, "STOP", (900, 220), font, 9.0, color, 30)
# 右手のテキストを描画
cv2.putText(annotated_frame, right_hand_text, (10, 90), font, font_scale, color, thickness)
# 左手のテキストを描画
cv2.putText(annotated_frame, left_hand_text, (10, 180), font, font_scale, color, thickness)
# cv2でフレームを表示
cv2.imshow("frame", annotated_frame)
# 'q'キーが押されたらループを抜ける
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# プログラムの終了時にソケットを閉じる
if client_socket is not None:
client_socket.close()
# 終了処理
cap.release()
cv2.destroyAllWindows()
実行
Raspberry Pi
export TURTLEBOT3_MODEL=waffle_pi
ros2 launch turtlebot3_bringup robot.launch.py
ロボット用PC
export TURTLEBOT3_MODEL=waffle
colcon build
source install/setup.bash
ros2 run package_name node_name
姿勢推定用PC
python pose_teleop.py
おわりに
身体を使ってロボットを操作する体験は幼い子どもたちにも釘付けでした.感覚的には警備員がオーライしているようなのと近い.