はじめに
この記事では、骨格検出したデータをROS2で取り扱う方法について紹介します。
基本的な方針としては、まず、デプスカメラから得られた画像データをYOLOv8によって処理し、画像上の骨格位置を検出します。次に、得られたピクセル座標をもとにポイントクラウドの3次元座標を取得します。最後に、各骨格データをPoseArray型のトピックを用いてパブリッシュします。
今回紹介する方法の実行環境は以下のとおりです。
項目 | スペック、バージョン |
---|---|
マシン | Intel NUC |
CPU | Core i5 |
OS | Ubuntu 22.04LTS |
ROS2 | Humble |
RealsenseSDK | 2.54.2.0 |
YOLO | v8 |
また、今回紹介するROS2パッケージは、以下のGitHubリポジトリにまとめてあります。
環境構築
本記事では、ROS2の環境構築およびRealsenseSDKの環境構築の説明については省略いたします。以下の記事を参考ください。
RealsenseをROS2で用いる実装では、realsense-rosなどのラッパーを用いる手法を主に見かけますが、今回はSDKを直接使うことで、Pointcloudやalignなどの機能を活用しようと思います。
実際に使用するのは、pyrealsenseというRealsenseSDKをPythonで記述できるようにしたライブラリです。以下のコマンドによってインストールします。
pip3 install pyrealsense2
続いて、骨格検出に用いるYOLOv8を導入します。以下のコマンドによってインストールします。
pip3 install ultralytics
筆者の場合、YOLOについてはインストールには少々時間がかかりました。
動作確認
簡単なスクリプトを用いて、YOLOの動作確認を行ってみましょう。
適当なディレクトリに'simple_pose.py
というPythonファイルを作成し、以下の内容を記述してみましょう。
from ultralytics import YOLO
model = YOLO('yolov8n-pose.pt')
#引数sourceの番号はデバイスIDなので適宜変更が必要
results = model(source=0, show=True, conf=0.3)
すると、以下の画像のような結果が得られると思います。これでひとまず動作確認は終了とします。
ROSノードの記述
以下のコードをノードとして実行することで、PoseArray型で各骨格の座標をパブリッシュすることができます。なお、ROS2のパッケージの設定等は記事上部に貼り付けたGithubのリポジトリを参照してください。
import pyrealsense2 as rs
from ultralytics import YOLO
import numpy as np
import cv2
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32MultiArray
from std_msgs.msg import MultiArrayDimension
from std_msgs.msg import Header
from geometry_msgs.msg import PoseArray
from geometry_msgs.msg import Point
from geometry_msgs.msg import Pose
from geometry_msgs.msg import Vector3
KEYPOINTS_NAMES = [
"nose", # 0
"eye(L)", # 1
"eye(R)", # 2
"ear(L)", # 3
"ear(R)", # 4
"shoulder(L)", # 5
"shoulder(R)", # 6
"elbow(L)", # 7
"elbow(R)", # 8
"wrist(L)", # 9
"wrist(R)", # 10
"hip(L)", # 11
"hip(R)", # 12
"knee(L)", # 13
"knee(R)", # 14
"ankle(L)", # 15
"ankle(R)", # 16
]
class PosePublisherClass(Node):
def __init__(self):
self.model = YOLO("yolov8n-pose.pt")
super().__init__('simple_face_dectation')
self.enableLog = False
# デプスとカラー画像の設定
self.pipeline = rs.pipeline()
self.config = rs.config()
self.pipeline_wrapper = rs.pipeline_wrapper(self.pipeline)
self.pipeline_profile = self.config.resolve(self.pipeline_wrapper)
self.device = self.pipeline_profile.get_device()
ctx = rs.context() # Create librealsense context for managing devices
serials = []
if (len(ctx.devices) > 0):
for dev in ctx.devices:
print ('Found device: ', \
dev.get_info(rs.camera_info.name), ' ', \
dev.get_info(rs.camera_info.serial_number))
serials.append(dev.get_info(rs.camera_info.serial_number))
else:
print("No Intel Device connected")
self.found_rgb = False
for s in self.device.sensors:
if s.get_info(rs.camera_info.name) == 'RGB Camera':
self.found_rgb = True
self.arrayPublisher_ = self.create_publisher(PoseArray, 'body', 10)
break
if not self.found_rgb:
print("The demo requires Depth camera with Color sensor")
exit(0)
self.align_to = rs.stream.color
self.align = rs.align(self.align_to)
#シリアル番号を指定する
self.selialNumber = '048522074360'
self.config.enable_device(self.selialNumber)
self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
self.config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
# ストリーミング開始
self.pipeline.start(self.config)
# ストリーミング情報の取得
self.profile = self.pipeline.get_active_profile()
self.depth_profile = rs.video_stream_profile(self.profile.get_stream(rs.stream.depth))
self.depth_intrinsics = self.depth_profile.get_intrinsics()
self.w, self.h = self.depth_intrinsics.width, self.depth_intrinsics.height
# ポイントクラウドの設定
self.pc = rs.pointcloud()
self.decimate = rs.decimation_filter()
self.decimate.set_option(rs.option.filter_magnitude, 2)
self.colorizer = rs.colorizer()
# ループ処理
timer_period = 0.03
self.timer = self.create_timer(timer_period, self.loop)
def loop(self):
self.msg = Vector3()
# フレーム入力待ち
self.frames = self.pipeline.wait_for_frames()
#フレーム入力時の処理
self.aligned_frames = self.align.process(self.frames)
self.depth_frame = self.aligned_frames.get_depth_frame()
self.color_frame = self.aligned_frames.get_color_frame()
self.depth_frame = self.decimate.process(self.depth_frame)
self.depth_intrinsics = rs.video_stream_profile(
self.depth_frame.profile).get_intrinsics()
self.w, self.h = self.depth_intrinsics.width, self.depth_intrinsics.height
#imageをnumpy arrayに
self.depth_image = np.asanyarray(self.depth_frame.get_data())
self.color_image = np.asanyarray(self.color_frame.get_data())
# ポイントクラウド関連処理
self.points = self.pc.calculate(self.depth_frame)
self.v= self.points.get_vertices()
self.verts = np.asanyarray(self.v).view(np.float32).reshape(-1, 3) # xyz
# リサイズ
self.color_image_s = cv2.resize(self.color_image, (self.w, self.h))
#YOLO処理
self.results = self.model(self.color_image_s, show=False, save=False)
self.names = self.results[0].names
self.classes = self.results[0].boxes.cls
self.boxes = self.results[0].boxes
self.annotatedFrame = self.results[0].plot()
self.keypoints = self.results[0].keypoints
self.id = 1
self.layoutArray = []
self.positionArray=[]
# 検出された人数分のキーポイントの処理を行うfor文
for kp in self.keypoints:
self.msg_array = Float32MultiArray()
if(kp.conf is not None):
if self.enableLog:
print("ID:",self.id)
kpxy=kp.xyn.tolist()
kpconf=kp.conf.tolist()
# キーポイントの座標の取得
for index in range(len(kpconf[0])):
self.dimInfo = MultiArrayDimension()
self.dimInfo.label = str(self.id) + "_" + KEYPOINTS_NAMES[index]
self.dimInfo.stride = 4
self.layoutArray.append(self.dimInfo)
self.target_x = int(kpxy[0][index][0] * self.w)
self.target_y = int(kpxy[0][index][1] * self.h)
#ポイントクラウド上の座標の取得
self.target_v = self.verts[self.target_x + (self.target_y - 1)* self.w]
self.poses = Pose()
self.poses.position.x = float(self.target_v[0])
self.poses.position.y = float(self.target_v[1])
self.poses.position.z = float(self.target_v[2])
self.poses.orientation.x = float(kpconf[0][index])
self.positionArray.append(self.poses)
if self.enableLog:
print(self.id, ":world position:" + KEYPOINTS_NAMES[index] + ":", self.target_v)
self.headerInfo = Header()
self.headerInfo.frame_id = str(self.id)
self.msg_array = PoseArray(header=self.headerInfo, poses=self.positionArray)
self.arrayPublisher_.publish(self.msg_array)
self.id += 1
else:
print("Keypoints is NOT detecting.")
def main(args=None):
rclpy.init(args=args)
publisher = PosePublisherClass()
rclpy.spin(publisher)
publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
簡単なコード解説
ノード起動処理
まず、PosePublisherClass
のデフォルトコンストラクタにて、YOLOのモデルの設定、Realsenseの起動処理、タイマーコールバックself.loop()
の呼び出しを行います。
self.model = YOLO("yolov8n-pose.pt")
この一文でYOLOのモデルを読み込ませていますが、この場合では、モデルデータyolov8n-pose.pt
は、ソースコードと同じディレクトリ階層に置かれる必要があるので注意してください。
self.pc = rs.pointcloud()
96行目ではポイントクラウドの機能を初期化しています。デプスのrawデータから視野角などを考慮した3次元座標変換のために必要となります。
ループ処理
self.loop()
では、RealsenseからのデータをもとにYOLOによる骨格推定とその結果をPoseArray型に変換する処理を行っています。
self.results = self.model(self.color_image_s, show=False, save=False)
YOLOから得られた結果は、このself.results
に集約されています。
self.target_x = int(kpxy[0][index][0] * self.w)
self.target_y = int(kpxy[0][index][1] * self.h)
#ポイントクラウド上の座標の取得
self.target_v = self.verts[self.target_x + (self.target_y - 1)* self.w]
165行目あたりでは、実際の三次元ワールド座標を取得する処理を行っています。self.target_x
およびself.target_y
は、2次元の画素の位置情報を取得しています。また、kpxy[0][index][0]
やkpxy[0][index][1]
は、推定された骨格位置の画素上の正規化した値を持っているため、それぞれにフレームのタテヨコの画素数を掛け算して、Intで丸めています。
self.verts
は三次元ワールド座標を1次元の配列で格納しており、二次元のXY座標をself.target_x + (self.target_y - 1)* self.w
のようにして表現することで1次元配列からほしい値を取得します。
self.poses.orientation.x = float(kpconf[0][index])
176行目のこの一文は、察しの良い方は気持ち悪い代入のように思われるかもしれません。
ここでは、値の信頼度confidence
を、わざとorientation
に入れています(ほかに入れるスキがなかったので。。。)。信頼度をデータに含めた理由としては、サブスクライバー側の処理で使用する信頼度とコンテンツが操作できるようにするためです。深い意味はありません。
self.headerInfo = Header()
self.headerInfo.frame_id = str(self.id)
self.msg_array = PoseArray(header=self.headerInfo, poses=self.positionArray)
self.arrayPublisher_.publish(self.msg_array)
183行目あたりでは、得られた値をROS2で使えるようにしています。
ROS2で配列形式の型を送信する際には、予めNumpy形式で定義した配列をインスタンス引数に入れます。今回では、poses=self.positionArray
のような形式をとっています。また、ヘッダー情報についても同様の形式で入れておきます。ヘッダー情報には、ユーザのIDself.id
の値を入れておくことで、複数人が映り込んでいる場合にも対応させるようにしました。
ビルドと実行
まずは、ワークスペースに入ってすぐのディレクトリまで移動し、ビルドを済ませます。
cd ~/(Your workspace directry)
colcon build --symlink-install
うまくビルドが成功したら早速実行してみます。
cd ~/(Your workspace directry)
. install/setup.bash
ros2 run ros2_realsense_yolo pose_publisher
実行するとコンソールに以下のような表示が行われます。
ID: 1
1 :world position:nose: [ -0 0 0]
1 :world position:eye(L): [ -0 0 0]
1 :world position:eye(R): [ -0 0 0]
1 :world position:ear(L): [ -0 0 0]
1 :world position:ear(R): [ -0 0 0]
1 :world position:shoulder(L): [ 0.16924 -0.11131 0.529]
1 :world position:shoulder(R): [ -0.11614 -0.11667 0.658]
1 :world position:elbow(L): [ 0.23814 0.076012 0.484]
1 :world position:elbow(R): [ -0.19813 0.11511 0.585]
1 :world position:wrist(L): [ 0.083401 0.085643 0.378]
1 :world position:wrist(R): [ -0.19948 0.11395 0.589]
1 :world position:hip(L): [ -0 0 0]
1 :world position:hip(R): [ -0 0 0]
1 :world position:knee(L): [ -0 0 0]
1 :world position:knee(R): [ -0 0 0]
1 :world position:ankle(L): [ -0 0 0]
1 :world position:ankle(R): [ -0 0 0]
このログの場合、左右の型、肘、手首の値が取得できていることになります。
発展
他の記事でも紹介したUnity-Robotics-Hubを経由してUnity上で推定された骨格を復元することももちろんできます。これにより、デプスカメラがロボットに取り付けられたような処理系と、仮想空間の生成を行う処理系とが分散している場合でもROS2を介して人体のモーショントラッキングができるようになります。
なお、ROS2とUnityを連携することについて紹介した過去の記事はこちらです。ぜひ合わせてお読みください
参考