24
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ChatGPTのAPI使って、Unity上の箱庭ロボットを動かしてみた!

Posted at

以前、Unity上の箱庭ロボットの強化学習を紹介しましたが、今度は、これを ChatGPT の API 使ってやってみましたので、やり方などをご紹介します。

ただ、今回はあまり詳細の解説はできませんので、もし興味ある方おられましたら、箱庭イベントに参加頂ければご説明しようかと思います~。

ロボットのルール

ChatGPTに与えるロボットのルールは以下のようにしました。
色んなやり方があるかと思いますが、『箱庭のロボットの仕様を書き連ねて、障害物にぶつからないようにうまく動いてねー』といった思いでルール化した感じです。

Rule1: あなたはロボットです。与えられた環境で自由に移動してください。
Rule2: ずっと同じ場所にとどまらず、動き続けてください。停止してはいけません。
Rule3: 障害物にぶつかってはいけません。
Rule4: 障害物との距離は、3方向(前、右、左)からセンサ値を取得できます。
Rule5: 3方向(前、右、左)のいずれかのセンサ値が、
       0.5以下の場合は衝突直前と判断して回避行動をとってください。
       回避する場合は、前進してはいけません。
Rule6: 移動するためのモーターがあります。
       モーター指示値としては、前進と回転の移動が可能です。
Rule7:前進する場合のパラメータ値は、0.0~50.0の範囲となります。
       値が大きいほど早く進みます。値が0の場合は停止します。
Rule8: 回転する場合のパラメータ値は、-10.0~10.0の範囲となります。
       正の値は、左回転、負の値は、右回転です。値が大きいほど回転速度が高くなります。
       値が0の場合は回転しません。
Rule9: 回転する場合は、前進のモータ指示値は必ず0.0にしてください。
Rule10: モーター指示値が、前進、回転ともに0の場合、移動していないとみなします。
Rule11: モーターの指示値は以下の書式で1データのみ出力してください。
       理由およびコメントの出力は不要です。
        { "x": (value of x according to Rule7), "z": (value of z according to Rule9)}
Rule12: 障害物までの距離は、以下の書式で入力します。
        d_f=<前方向にある障害物までの距離>
        d_r=<右方向の障害物までの距離>
        d_l=<左方向の障害物までの距離>
Rule13: Rule12の入力が与えられたら、Rule1~Rule10のルールに従って、Rule11の出力をしてください。
Rule14: 障害物回避する場合は、停止してはいけません。回転して回避することを推奨します。

ちなみに、ルールは英語に変換して使うことにししました(ChatGPTに翻訳してもらいました)。
※日本語だとトークン数が増えるような気がしたのと、曖昧さが減るかな?と思って。

英語版はこちら:

Rule1: You are a robot. Move freely in the given environment.
Rule2: Keep moving and don't stay in the same place. Do not stop.
Rule3: Do not collide with obstacles.
Rule4: You can obtain sensor values for obstacle distance from three directions (front, right, left).
Rule5: If the distance to an obstacle in any of the three directions (front, right, left) is 0.5 or less, assume that a collision is imminent and take evasive action. If you need to avoid obstacles, do not move forward.
Rule6: You have motors for movement. You can move forward and rotate as motor instructions.
Rule7: The parameter value for moving forward ranges from 0.0 to 50.0. The larger the value, the faster you move. If the value is 0, you will stop.
Rule8: The parameter value for rotation ranges from -10.0 to 10.0. Positive values indicate left rotation, and negative values indicate right rotation. The larger the value, the faster the rotation speed. If the value is 0, you will not rotate.
Rule9: If you are rotating, set the motor instruction value for forward movement to 0.0.
Rule10: If the motor instruction values for both forward movement and rotation are 0, consider that you are not moving.
Rule11: Output only one data for motor instructions in the following format. Do not output reasons or comments.
{ "x": (value of x according to Rule7), "z": (value of z according to Rule9)}
Rule12: Input the distance to obstacles in the following format:
d_f=<distance to an obstacle in front>
d_r=<distance to an obstacle on the right>
d_l=<distance to an obstacle on the left>
Rule13: If input for Rule12 is given, follow Rules 1-10 and output Rule 11.
Rule14: If you need to avoid obstacles, do not stop. It is recommended to rotate to avoid obstacles.

Pythonプログラム

作成したコードは以下の通りです。ポイントとしては、ChatGPTのAPIコールは遅い(数秒以上かかる)ので、別スレッドで非同期化し、2秒単位でロボット指示値をもらう設計にしてます。応答が返るまでは前回値で動きます。

※補足:毎回、ルールとセンサ値を渡すのは冗長な気がしていますが、仕方ないですね。。

#!/usr/bin/python
# -*- coding: utf-8 -*-
import json
import sys
from hako_binary import offset_map
from hako_binary import binary_writer
from hako_binary import binary_reader
import qtable_model2
import hako_env
import hako
import time
import signal
from types import MethodType
import hako_robomodel_any
import struct
import threading
import openai
import re

def handler(signum, frame):
  print(f'SIGNAL(signum={signum})')
  sys.exit(0)
  
print("START TB3 TEST")

# signal.SIGALRMのシグナルハンドラを登録
signal.signal(signal.SIGINT, handler)

#create hakoniwa env
env = hako_env.make("SampleRobo", "any", "dev/ai/custom.json")
print("WAIT START:")
env.hako.wait_event(hako.HakoEvent['START'])
print("WAIT RUNNING:")
env.hako.wait_state(hako.HakoState['RUNNING'])

print("GO:")

#do simulation
def delta_usec():
  return 20000

robo = env.robo()
robo.delta_usec = delta_usec
# OpenAI APIキーを設定する
openai.api_key = "OPENAI API KEY"

rule = None
with open("dev/ai/rule.txt", "r") as file:
    rule = file.read()

print(rule)

# OpenAI APIでモデルを指定して応答を取得する
def get_response(rule, query_data):
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "user", "content": rule + "\n" + query_data }
        ]
    )
    return response["choices"][0]["message"]["content"]

shared_dict = {
  'd_f': 3.5,
  'd_l': 3.5,
  'd_r': 3.5,
  'l_x': "0.0",
  'a_z': "0.0"
}

lock = threading.Lock()
def thread_func():
    value = "50.0" 
    global shared_dict
    while True:
      time.sleep(2.0)
      d_f = shared_dict.get('d_f', 3.5)
      d_r = shared_dict.get('d_r', 3.5)
      d_l = shared_dict.get('d_l', 3.5)
      input_data = "d_f="+str(d_f)+", d_r="+str(d_r)+ ", d_l="+str(d_l)
      print("INPUT DATA: scan_min: " + input_data)
      response = get_response(rule, input_data)
      print("ChatGPT REPLY: " + response)
      s = response
      match = re.search(r"\{[^}]+\}", s)

      if match:
          d = eval(match.group())
          with lock:
            shared_dict['l_x'] = str(d['x'])
            shared_dict['a_z'] = str(d['z'])
      else:
          print("No dictionary found in string.")


def main_func():
  total_time = 0
  done = False
  while not done and total_time < 40000:
    
    sensors = env.hako.execute()

    #laser scan
    scan = robo.get_state("scan", sensors)
    scan_ranges = scan['ranges']
    scan_f = min(min(scan_ranges[0:15]), min(scan_ranges[345:359]))
    scan_r = min(scan_ranges[30:50])
    scan_l = min(scan_ranges[260:280])
    #print("scan=" + str(scan_min))

    #camera sensor
    if total_time % 50 == 0:
      img = robo.get_state("camera_image_jpg", sensors)
      file_data = img['data__raw']
      #file_data = struct.pack('B' * len(image_data), *image_data)
      with open("camera-01.jpg" , 'bw') as f:
          f.write(file_data)

      #motor control
      with lock:
        shared_dict['d_f'] = scan_f
        shared_dict['d_l'] = scan_l
        shared_dict['d_r'] = scan_r
        motor = robo.get_action('cmd_vel')
        motor['linear']['x'] = float(shared_dict['l_x'])
        motor['angular']['z'] = float(shared_dict['a_z'])
      
      for channel_id in robo.actions:
        robo.hako.write_pdu(channel_id, robo.actions[channel_id])
    
    total_time = total_time + 1

  env.reset()

thread = threading.Thread(target=thread_func)
thread.start()

main_func()

# スレッドが終了するまで待機
thread.join()

print("END")
env.reset()
sys.exit(0)

さっそく、動かしてみた

箱庭環境で、動かしてみた結果はこんな感じです(2個目の動画です)。

ChatGPTからの応答ログは以下の通りで、いちよ回避行動はとろうとしてくれています。最後はダメだったけど。。

INPUT DATA: scan_min: d_f=0.0, d_r=0.0, d_l=0.0
ChatGPT REPLY: {"x":0.0, "z":0.0}
INPUT DATA: scan_min: d_f=0.30734673142433167, d_r=0.13915935158729553, d_l=0.2600001394748688
ChatGPT REPLY: {"x":50.0, "z":0.0}
INPUT DATA: scan_min: d_f=0.3057899475097656, d_r=3.5, d_l=0.260001003742218
ChatGPT REPLY: {"x": 50.0, "z": 0.0}
INPUT DATA: scan_min: d_f=0.2699219882488251, d_r=3.5, d_l=0.25992000102996826
ChatGPT REPLY: {"x":0.0, "z":10.0}
INPUT DATA: scan_min: d_f=0.2159612625837326, d_r=0.3870047628879547, d_l=0.25956088304519653
ChatGPT REPLY: {"x":0.0, "z":-7.0}
INPUT DATA: scan_min: d_f=0.3058274984359741, d_r=0.5324169993400574, d_l=0.26813948154449463
ChatGPT REPLY: {"x": 0.0, "z": -5.0}
INPUT DATA: scan_min: d_f=0.1812015026807785, d_r=0.35781750082969666, d_l=0.2681979238986969
ChatGPT REPLY: {"x": 0.0, "z": 10.0}
INPUT DATA: scan_min: d_f=0.17253835499286652, d_r=0.29600223898887634, d_l=0.2717927396297455
ChatGPT REPLY: {"x":0.0, "z":10.0}
INPUT DATA: scan_min: d_f=0.27971455454826355, d_r=0.5244808793067932, d_l=0.46567100286483765
ChatGPT REPLY: {"x":0.0, "z":8.0}
INPUT DATA: scan_min: d_f=3.5, d_r=0.12997381389141083, d_l=0.15790992975234985
ChatGPT REPLY: {"x": 30.0, "z": 0.0}
INPUT DATA: scan_min: d_f=3.5, d_r=3.5, d_l=0.28135761618614197
ChatGPT REPLY: { "x": 50.0, "z": 0.0}
INPUT DATA: scan_min: d_f=0.10723230987787247, d_r=3.5, d_l=0.3093762695789337
ChatGPT REPLY: { "x": 0, "z": -5.0}
INPUT DATA: scan_min: d_f=0.06709610670804977, d_r=3.5, d_l=0.3333658277988434
ChatGPT REPLY: { "x": 30.0, "z": 0.0 }
INPUT DATA: scan_min: d_f=0.04059077799320221, d_r=0.04397480934858322, d_l=0.31892815232276917
ChatGPT REPLY: {"x":0.0, "z":-1.0}
INPUT DATA: scan_min: d_f=3.5, d_r=3.5, d_l=0.31472161412239075
ChatGPT REPLY: {"x":50.0, "z":0.0}
INPUT DATA: scan_min: d_f=3.5, d_r=0.388701856136322, d_l=0.2507988214492798
ChatGPT REPLY: {"x":50.0, "z":0.0}

バックグラウンド

なぜこういうことを始めたかと言いますと、Unityの世界を認識して、Unityの世界を動き回れるAIを、家のパソコンとかラズパイ使って、完全無料で動かしてみたいと思ったのが発端です。

箱庭のアーキテクチャとしてはこんな感じでやればできそうだなと。で、今回はLLM側の試行として、ChatGPT使って素振りしてみたという感じです~。

image.png

ちなみに、自分と同じような思いでやられているお方がおられます。こんな感じのを箱庭でやりたい!

24
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
24
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?