以前、Unity上の箱庭ロボットの強化学習を紹介しましたが、今度は、これを ChatGPT の API 使ってやってみましたので、やり方などをご紹介します。
ただ、今回はあまり詳細の解説はできませんので、もし興味ある方おられましたら、箱庭イベントに参加頂ければご説明しようかと思います~。
ロボットのルール
ChatGPTに与えるロボットのルールは以下のようにしました。
色んなやり方があるかと思いますが、『箱庭のロボットの仕様を書き連ねて、障害物にぶつからないようにうまく動いてねー』といった思いでルール化した感じです。
Rule1: あなたはロボットです。与えられた環境で自由に移動してください。
Rule2: ずっと同じ場所にとどまらず、動き続けてください。停止してはいけません。
Rule3: 障害物にぶつかってはいけません。
Rule4: 障害物との距離は、3方向(前、右、左)からセンサ値を取得できます。
Rule5: 3方向(前、右、左)のいずれかのセンサ値が、
0.5以下の場合は衝突直前と判断して回避行動をとってください。
回避する場合は、前進してはいけません。
Rule6: 移動するためのモーターがあります。
モーター指示値としては、前進と回転の移動が可能です。
Rule7:前進する場合のパラメータ値は、0.0~50.0の範囲となります。
値が大きいほど早く進みます。値が0の場合は停止します。
Rule8: 回転する場合のパラメータ値は、-10.0~10.0の範囲となります。
正の値は、左回転、負の値は、右回転です。値が大きいほど回転速度が高くなります。
値が0の場合は回転しません。
Rule9: 回転する場合は、前進のモータ指示値は必ず0.0にしてください。
Rule10: モーター指示値が、前進、回転ともに0の場合、移動していないとみなします。
Rule11: モーターの指示値は以下の書式で1データのみ出力してください。
理由およびコメントの出力は不要です。
{ "x": (value of x according to Rule7), "z": (value of z according to Rule9)}
Rule12: 障害物までの距離は、以下の書式で入力します。
d_f=<前方向にある障害物までの距離>
d_r=<右方向の障害物までの距離>
d_l=<左方向の障害物までの距離>
Rule13: Rule12の入力が与えられたら、Rule1~Rule10のルールに従って、Rule11の出力をしてください。
Rule14: 障害物回避する場合は、停止してはいけません。回転して回避することを推奨します。
ちなみに、ルールは英語に変換して使うことにししました(ChatGPTに翻訳してもらいました)。
※日本語だとトークン数が増えるような気がしたのと、曖昧さが減るかな?と思って。
英語版はこちら:
Rule1: You are a robot. Move freely in the given environment.
Rule2: Keep moving and don't stay in the same place. Do not stop.
Rule3: Do not collide with obstacles.
Rule4: You can obtain sensor values for obstacle distance from three directions (front, right, left).
Rule5: If the distance to an obstacle in any of the three directions (front, right, left) is 0.5 or less, assume that a collision is imminent and take evasive action. If you need to avoid obstacles, do not move forward.
Rule6: You have motors for movement. You can move forward and rotate as motor instructions.
Rule7: The parameter value for moving forward ranges from 0.0 to 50.0. The larger the value, the faster you move. If the value is 0, you will stop.
Rule8: The parameter value for rotation ranges from -10.0 to 10.0. Positive values indicate left rotation, and negative values indicate right rotation. The larger the value, the faster the rotation speed. If the value is 0, you will not rotate.
Rule9: If you are rotating, set the motor instruction value for forward movement to 0.0.
Rule10: If the motor instruction values for both forward movement and rotation are 0, consider that you are not moving.
Rule11: Output only one data for motor instructions in the following format. Do not output reasons or comments.
{ "x": (value of x according to Rule7), "z": (value of z according to Rule9)}
Rule12: Input the distance to obstacles in the following format:
d_f=<distance to an obstacle in front>
d_r=<distance to an obstacle on the right>
d_l=<distance to an obstacle on the left>
Rule13: If input for Rule12 is given, follow Rules 1-10 and output Rule 11.
Rule14: If you need to avoid obstacles, do not stop. It is recommended to rotate to avoid obstacles.
Pythonプログラム
作成したコードは以下の通りです。ポイントとしては、ChatGPTのAPIコールは遅い(数秒以上かかる)ので、別スレッドで非同期化し、2秒単位でロボット指示値をもらう設計にしてます。応答が返るまでは前回値で動きます。
※補足:毎回、ルールとセンサ値を渡すのは冗長な気がしていますが、仕方ないですね。。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import json
import sys
from hako_binary import offset_map
from hako_binary import binary_writer
from hako_binary import binary_reader
import qtable_model2
import hako_env
import hako
import time
import signal
from types import MethodType
import hako_robomodel_any
import struct
import threading
import openai
import re
def handler(signum, frame):
print(f'SIGNAL(signum={signum})')
sys.exit(0)
print("START TB3 TEST")
# signal.SIGALRMのシグナルハンドラを登録
signal.signal(signal.SIGINT, handler)
#create hakoniwa env
env = hako_env.make("SampleRobo", "any", "dev/ai/custom.json")
print("WAIT START:")
env.hako.wait_event(hako.HakoEvent['START'])
print("WAIT RUNNING:")
env.hako.wait_state(hako.HakoState['RUNNING'])
print("GO:")
#do simulation
def delta_usec():
return 20000
robo = env.robo()
robo.delta_usec = delta_usec
# OpenAI APIキーを設定する
openai.api_key = "OPENAI API KEY"
rule = None
with open("dev/ai/rule.txt", "r") as file:
rule = file.read()
print(rule)
# OpenAI APIでモデルを指定して応答を取得する
def get_response(rule, query_data):
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": rule + "\n" + query_data }
]
)
return response["choices"][0]["message"]["content"]
shared_dict = {
'd_f': 3.5,
'd_l': 3.5,
'd_r': 3.5,
'l_x': "0.0",
'a_z': "0.0"
}
lock = threading.Lock()
def thread_func():
value = "50.0"
global shared_dict
while True:
time.sleep(2.0)
d_f = shared_dict.get('d_f', 3.5)
d_r = shared_dict.get('d_r', 3.5)
d_l = shared_dict.get('d_l', 3.5)
input_data = "d_f="+str(d_f)+", d_r="+str(d_r)+ ", d_l="+str(d_l)
print("INPUT DATA: scan_min: " + input_data)
response = get_response(rule, input_data)
print("ChatGPT REPLY: " + response)
s = response
match = re.search(r"\{[^}]+\}", s)
if match:
d = eval(match.group())
with lock:
shared_dict['l_x'] = str(d['x'])
shared_dict['a_z'] = str(d['z'])
else:
print("No dictionary found in string.")
def main_func():
total_time = 0
done = False
while not done and total_time < 40000:
sensors = env.hako.execute()
#laser scan
scan = robo.get_state("scan", sensors)
scan_ranges = scan['ranges']
scan_f = min(min(scan_ranges[0:15]), min(scan_ranges[345:359]))
scan_r = min(scan_ranges[30:50])
scan_l = min(scan_ranges[260:280])
#print("scan=" + str(scan_min))
#camera sensor
if total_time % 50 == 0:
img = robo.get_state("camera_image_jpg", sensors)
file_data = img['data__raw']
#file_data = struct.pack('B' * len(image_data), *image_data)
with open("camera-01.jpg" , 'bw') as f:
f.write(file_data)
#motor control
with lock:
shared_dict['d_f'] = scan_f
shared_dict['d_l'] = scan_l
shared_dict['d_r'] = scan_r
motor = robo.get_action('cmd_vel')
motor['linear']['x'] = float(shared_dict['l_x'])
motor['angular']['z'] = float(shared_dict['a_z'])
for channel_id in robo.actions:
robo.hako.write_pdu(channel_id, robo.actions[channel_id])
total_time = total_time + 1
env.reset()
thread = threading.Thread(target=thread_func)
thread.start()
main_func()
# スレッドが終了するまで待機
thread.join()
print("END")
env.reset()
sys.exit(0)
さっそく、動かしてみた
箱庭環境で、動かしてみた結果はこんな感じです(2個目の動画です)。
ChatGPTからの応答ログは以下の通りで、いちよ回避行動はとろうとしてくれています。最後はダメだったけど。。
INPUT DATA: scan_min: d_f=0.0, d_r=0.0, d_l=0.0
ChatGPT REPLY: {"x":0.0, "z":0.0}
INPUT DATA: scan_min: d_f=0.30734673142433167, d_r=0.13915935158729553, d_l=0.2600001394748688
ChatGPT REPLY: {"x":50.0, "z":0.0}
INPUT DATA: scan_min: d_f=0.3057899475097656, d_r=3.5, d_l=0.260001003742218
ChatGPT REPLY: {"x": 50.0, "z": 0.0}
INPUT DATA: scan_min: d_f=0.2699219882488251, d_r=3.5, d_l=0.25992000102996826
ChatGPT REPLY: {"x":0.0, "z":10.0}
INPUT DATA: scan_min: d_f=0.2159612625837326, d_r=0.3870047628879547, d_l=0.25956088304519653
ChatGPT REPLY: {"x":0.0, "z":-7.0}
INPUT DATA: scan_min: d_f=0.3058274984359741, d_r=0.5324169993400574, d_l=0.26813948154449463
ChatGPT REPLY: {"x": 0.0, "z": -5.0}
INPUT DATA: scan_min: d_f=0.1812015026807785, d_r=0.35781750082969666, d_l=0.2681979238986969
ChatGPT REPLY: {"x": 0.0, "z": 10.0}
INPUT DATA: scan_min: d_f=0.17253835499286652, d_r=0.29600223898887634, d_l=0.2717927396297455
ChatGPT REPLY: {"x":0.0, "z":10.0}
INPUT DATA: scan_min: d_f=0.27971455454826355, d_r=0.5244808793067932, d_l=0.46567100286483765
ChatGPT REPLY: {"x":0.0, "z":8.0}
INPUT DATA: scan_min: d_f=3.5, d_r=0.12997381389141083, d_l=0.15790992975234985
ChatGPT REPLY: {"x": 30.0, "z": 0.0}
INPUT DATA: scan_min: d_f=3.5, d_r=3.5, d_l=0.28135761618614197
ChatGPT REPLY: { "x": 50.0, "z": 0.0}
INPUT DATA: scan_min: d_f=0.10723230987787247, d_r=3.5, d_l=0.3093762695789337
ChatGPT REPLY: { "x": 0, "z": -5.0}
INPUT DATA: scan_min: d_f=0.06709610670804977, d_r=3.5, d_l=0.3333658277988434
ChatGPT REPLY: { "x": 30.0, "z": 0.0 }
INPUT DATA: scan_min: d_f=0.04059077799320221, d_r=0.04397480934858322, d_l=0.31892815232276917
ChatGPT REPLY: {"x":0.0, "z":-1.0}
INPUT DATA: scan_min: d_f=3.5, d_r=3.5, d_l=0.31472161412239075
ChatGPT REPLY: {"x":50.0, "z":0.0}
INPUT DATA: scan_min: d_f=3.5, d_r=0.388701856136322, d_l=0.2507988214492798
ChatGPT REPLY: {"x":50.0, "z":0.0}
バックグラウンド
なぜこういうことを始めたかと言いますと、Unityの世界を認識して、Unityの世界を動き回れるAIを、家のパソコンとかラズパイ使って、完全無料で動かしてみたいと思ったのが発端です。
箱庭のアーキテクチャとしてはこんな感じでやればできそうだなと。で、今回はLLM側の試行として、ChatGPT使って素振りしてみたという感じです~。
ちなみに、自分と同じような思いでやられているお方がおられます。こんな感じのを箱庭でやりたい!