はじめに
- 本記事ではAzure OpenAI Serviceにてプレビューリリースされた Assistants API を使用して、自然言語によるUnattended Robot実行の実装方法について説明します。
- この実装により標題通り「○○やっといて!」というユーザープロンプトをトリガーとしてロボットによる自動化処理を実行することできます。しかも プロンプトの文脈に応じてパラメーターを自動抽出して引数を渡すこともできます!
- Unattended Robotの実行には APIトリガー を使用して実行用エンドポイントを作成します。Assistants APIのFunction Calling によってUnattended Robot実行のための関数(今回はPythonで実装)と引数を抽出して実行します。
処理の流れ
-
今回使用するサービスの一覧です。
- Azure OpenAI Service
- UiPath Automation Cloud (Community版でもOK)
- UiPath Unattended Robot (以降URと略す)
- Google Colaboratory (以降Colabと略す)
-
オブジェクトごとの処理の流れです。青色矢印がメッセージの流れ、赤色矢印がファンクション実行の流れです。
デモ動画
- 完成形はこのように動作します。
実装手順
- 実装手順の大まかな流れについて説明します。
- プロセス登録: UiPath Studioにて自動化処理のワークフローを作成し、Automation Cloud上のOrchestratorにパブリッシュしてプロセスとして登録します。
- UR設定: Automation Cloudにてプロセスの実行環境であるURの設定を行います。
- APIトリガー設定: Automation CloudにてURジョブ実行を容易に行うためにAPIトリガーを設定します。
- Assistant作成: Google ColabにてPythonコードでAssistants APIを実行し、Assistantを作成します。
- Python関数作成: AssistantからのFunction Callingにて実行されるPython関数を作成します。この関数がコールされるとAPIトリガーによってURジョブが実行されるように設定しておきます。
- Assistantスレッド作成してプロンプト実行: Assistants APIにてAssistantスレッドを作成します。この時、ユーザーからのプロンプトを受け付けてその内容に応じてPython関数をコールしてURジョブを実行し、結果を出力します。
- 実装手順の詳細について説明します。
1. プロセス登録
-
URジョブとして実行するプロセスを登録します。例として下記のようなワークフローをUiPath Studioで作成してAutomation Cloud上のOrchestratorにパブリッシュします。
-
RPA_Challenge01:
browser
入力引数で指定されたブラウザー(ChromeまたはEdge)で RPAチャレンジ を実行して結果を出力引数として返すワークフロー
-
Automation Cloud にログインし、Orchestratorのフォルダーに移動します。
2. UR設定
-
マシンテンプレートを作成し、Production(Unattended)ランタイムを1にセットします。生成されたクライアントIDとクライアントシークレットをメモします。
-
Windowsマシンにて UiPath Studioインストーラー をダウンロードしてUnattended Robotとしてインストールします。先ほどメモしたクライアントIDとクライアントシークレットを使用してOrchestratorに接続します。
-
ロボットアカウントを作成し、プロセスを登録したフォルダーに割り当てます。マシンテンプレートも同じフォルダーに割り当てます。
-
手動でURジョブ実行できることを確認します。
3. APIトリガー設定
- それぞれのプロセスを容易に実行するためにAPIトリガーを設定します。詳細な手順は下記の記事をご参照ください。
-
出力引数を受け取るために呼び出しモードは 同期 (ロングポーリング) を使用します。
長時間実行されるURジョブはPythonコードからの呼び出し時にタイムアウトになる可能性があります。非同期ポーリング によってタイムアウトは回避できますが出力引数を受け取るためには別途APIコールする必要があります。
4. Assistant作成
-
Azure OpenAI Serviceのインスタンスをデプロイします。Assistants APIを使用するには2024年2月現在では 米国東部2、オーストラリア東部またはスウェーデン中部リージョンでのみ利用できます。今回はスウェーデン中部(
Sweden Central
)でデプロイします。
-
Azure AI StudioにてGPT-4モデルをデプロイします。デプロイ名は
gpt-4
とします。
-
Google ColabにてPythonコードを順次実行します。(これらのコードは Assistants function calling with Bing Search を参考にしております)
- まず必要なライブラリーをインストールします。
!pip install openai requests typing
- openaiのライブラリ―バージョンは1.x系が必要となります。古いバージョンを使用している場合には
!pip install openai --upgrade
を実行します。
- 次のコマンドを実行します。Azure OpenAIリソース名とAPIキーは適宜変更します。
import os # 環境に応じて変更 os.environ["AZURE_OPENAI_ENDPOINT"] = "https://<Azure OpenAI リソース名>.openai.azure.com/" os.environ["AZURE_OPENAI_KEY"] = "<Azure OpenAI APIキー>" os.environ["AZURE_OPENAI_DEPLOYMENT"] = "gpt-4"
- まず必要なライブラリーをインストールします。
-
Azure OpenAI Serviceをclientとして定義します。
from openai import AzureOpenAI client = AzureOpenAI( api_key=os.getenv("AZURE_OPENAI_KEY"), api_version="2024-02-15-preview", azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") )
-
アシスタントを作成します。アシスタント名、指示を指定します。ツールとしてFuncation Callingで実行する関数と引数を定義します。
- 関数名:
executeUR
- パラメーター:
-
process
: Automation Cloudでこのあと登録するプロセス名をenum
で列挙します。(LogTest
やRPA-Challenge01
は適宜変更します) -
input
: プロセスに渡す引数を定義します。
-
# アシスタントの作成 assistant = client.beta.assistants.create( name="Automation-Assistant", # アシスタント名 instructions="あなたはAutomation Cloudアシスタントです。ユーザーからの指示に従って関数を実行し、結果を答えてください。", # 指示 (アシスタントの定義や目的など) model=os.getenv("AZURE_OPENAI_DEPLOYMENT"), # モデルのデプロイ名 # アシスタントが使用できるツールのリスト (ここではFunction callingで1つの外部APIを指定) tools=[{ "type": "function", "function": { "name": "executeUR", "description": "Execute Unattended Robot based on Process name", "parameters": { "type": "object", "properties": { "process": {"type": "string", "enum": [ "LogTest", "RPA-Challenge01" ]}, # プロセス名を列挙 "input": {"type": "string", "description": "Input parameter"} # 引数 }, "required": ["process", "input"] # 必須フィールド } } }] )
- 関数名:
-
アシスタントの作成を確認します。
# 確認 for k,v in dict(assistant).items(): print(k + ": ", v)
-
先ほど出力された
id
を環境変数にセットします。 -
Automation Cloudの個人用アクセストークン(PAT)を環境変数にセットします。PATの取得方法は こちらの記事 をご覧ください。スコープは
OR.Jobs
を指定します。import time import json import requests from typing import Optional # 環境に応じて変更 os.environ["AZURE_OPENAI_ASSISTANT"] = "<アシスタントID>" os.environ["UIPATH_PAT"] = "<個人用アクセストークン>"
-
なお作成されたアシスタントは Azure AI Studio でも確認できます。
5. Python関数作成
-
次にFunction Callingによって実行される関数
executeUR
を定義します。- この関数はプロセス名と引数を受け取り、プロセスごとに定義したAPIトリガーにてURジョブを実行します。(APIトリガーのエンドポイントは適宜変更します)
- 入力引数名もプロセスごとに適宜変更します。この例では LogTestプロセスは
message
、RPA-Challenge01プロセスはbrowser
という入力引数を取ります。 - APIトリガーのステータスコードは呼び出しモードが同期(ロングポーリング)の時は200(OK)、非同期の時は202(Accepted)となります。
- 同期呼び出しによるURジョブ正常実行時には出力引数が取得できますので、この関数の戻り値として返します。
# 外部API def executeUR(process, input=''): print('>Process: ', process) print('>Input: ', input) # Personal Access Token auth = 'Bearer ' + os.getenv("UIPATH_PAT") # Header headers = { 'Authorization': auth } base_url = '' params = {} # ProcessごとのAPI Triggerエンドポイント if process == 'LogTest': base_url = '<LogTestプロセスを実行するAPIトリガーのエンドポイント>' params = { 'message': input } elif process == 'RPA-Challenge01': base_url = '<RPA-Challenge01プロセスを実行するAPIトリガーのエンドポイント>' params = { 'browser': input } else: print(f"Error: Process {process} is not defined!") return None # APIリクエストを送信 response = requests.get(base_url, headers=headers, params=params) if response.status_code == 200 or response.status_code == 202: # レスポンスが成功した場合、データを解析 data = response.json() result = data['output'] return result else: # エラーが発生した場合、ステータスコードを出力 print(f"Error: Unable to execute Unattended Robot. Status code {response.status_code}") return None
-
関数呼び出しでアシスタントに必要な3つの関数を定義します。これらすべての関数は最後のプロンプト実行時に使用され、URジョブ実行の状態監視やメッセージ・パラメーターの受け渡しなどを行います。
def poll_run_till_completion( client: AzureOpenAI, thread_id: str, run_id: str, available_functions: dict, verbose: bool, max_steps: int = 10, wait: int = 3, ) -> None: """ Poll a run until it is completed or failed or exceeds a certain number of iterations (MAX_STEPS) with a preset wait in between polls @param client: OpenAI client @param thread_id: Thread ID @param run_id: Run ID @param assistant_id: Assistant ID @param verbose: Print verbose output @param max_steps: Maximum number of steps to poll @param wait: Wait time in seconds between polls """ if (client is None and thread_id is None) or run_id is None: print("Client, Thread ID and Run ID are required.") return try: cnt = 0 while cnt < max_steps: run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id) if verbose: print("Poll {}: {}".format(cnt, run.status)) cnt += 1 if run.status == "requires_action": tool_responses = [] if ( run.required_action.type == "submit_tool_outputs" and run.required_action.submit_tool_outputs.tool_calls is not None ): tool_calls = run.required_action.submit_tool_outputs.tool_calls for call in tool_calls: if call.type == "function": if call.function.name not in available_functions: raise Exception("Function requested by the model does not exist") function_to_call = available_functions[call.function.name] tool_response = function_to_call(**json.loads(call.function.arguments)) tool_responses.append({"tool_call_id": call.id, "output": tool_response}) run = client.beta.threads.runs.submit_tool_outputs( thread_id=thread_id, run_id=run.id, tool_outputs=tool_responses ) if run.status == "failed": print("Run failed.") break if run.status == "completed": break time.sleep(wait) except Exception as e: print(e) def create_message( client: AzureOpenAI, thread_id: str, role: str = "", content: str = "", file_ids: Optional[list] = None, metadata: Optional[dict] = None, message_id: Optional[str] = None, ) -> any: """ Create a message in a thread using the client. @param client: OpenAI client @param thread_id: Thread ID @param role: Message role (user or assistant) @param content: Message content @param file_ids: Message file IDs @param metadata: Message metadata @param message_id: Message ID @return: Message object """ if metadata is None: metadata = {} if file_ids is None: file_ids = [] if client is None: print("Client parameter is required.") return None if thread_id is None: print("Thread ID is required.") return None try: if message_id is not None: return client.beta.threads.messages.retrieve(thread_id=thread_id, message_id=message_id) if file_ids is not None and len(file_ids) > 0 and metadata is not None and len(metadata) > 0: return client.beta.threads.messages.create( thread_id=thread_id, role=role, content=content, file_ids=file_ids, metadata=metadata ) if file_ids is not None and len(file_ids) > 0: return client.beta.threads.messages.create( thread_id=thread_id, role=role, content=content, file_ids=file_ids ) if metadata is not None and len(metadata) > 0: return client.beta.threads.messages.create( thread_id=thread_id, role=role, content=content, metadata=metadata ) return client.beta.threads.messages.create(thread_id=thread_id, role=role, content=content) except Exception as e: print(e) return None def retrieve_and_print_messages( client: AzureOpenAI, thread_id: str, verbose: bool, out_dir: Optional[str] = None ) -> any: """ Retrieve a list of messages in a thread and print it out with the query and response @param client: OpenAI client @param thread_id: Thread ID @param verbose: Print verbose output @param out_dir: Output directory to save images @return: Messages object """ if client is None and thread_id is None: print("Client and Thread ID are required.") return None try: messages = client.beta.threads.messages.list(thread_id=thread_id) display_role = {"user": "User query", "assistant": "Assistant response"} prev_role = None if verbose: print("\n\nCONVERSATION:") for md in reversed(messages.data): if prev_role == "assistant" and md.role == "user" and verbose: print("------ \n") for mc in md.content: # Check if valid text field is present in the mc object if mc.type == "text": txt_val = mc.text.value # Check if valid image field is present in the mc object elif mc.type == "image_file": image_data = client.files.content(mc.image_file.file_id) if out_dir is not None: out_dir_path = Path(out_dir) if out_dir_path.exists(): image_path = out_dir_path / (mc.image_file.file_id + ".png") with image_path.open("wb") as f: f.write(image_data.read()) if verbose: if prev_role == md.role: print(txt_val) else: print("{}:\n{}".format(display_role[md.role], txt_val)) prev_role = md.role return messages except Exception as e: print(e) return None
6. Assistantスレッド作成してURジョブ実行
-
Assistantスレッドを作成します。この時、ユーザーからのプロンプトを受け付けてその内容に応じてPython関数をコールしてURジョブを実行します。
# Main prompt = input('Enter your request: ') message = {"role": "user", "content": prompt} available_functions = {"executeUR": executeUR} verbose_output = True thread = client.beta.threads.create() create_message(client, thread.id, message["role"], message["content"]) run = client.beta.threads.runs.create(thread_id=thread.id, assistant_id=os.getenv("AZURE_OPENAI_ASSISTANT")) poll_run_till_completion( client=client, thread_id=thread.id, run_id=run.id, available_functions=available_functions, verbose=verbose_output ) messages = retrieve_and_print_messages(client=client, thread_id=thread.id, verbose=verbose_output)
-
プロンプトにて
「テストですよ!」とログ出力しといて
と入力してみます。おおざっぱな指示ですが、アシスタントがパラメーターを自動抽出して関数をexecuteUR({"process":"LogTest","input":"テストですよ!"})
と実行してくれます。LogTestプロセスに渡すmessage引数を使ってURジョブ実行されます。
-
もう一度Assistantスレッド作成のコードを実行します。今度はプロンプトにて
RPAチャレンジをEdgeで実行して結果を表示して!
と入力してみます。今回もアシスタントがパラメーターを自動抽出して関数をexecuteUR({"process":"RPA-Challenge01","input":"Edge"})
と実行してくれます。さらに実行結果としての出力引数(正答率や実行時間)も表示してくれます。全体の挙動は デモ動画 の通りです。
-
process
パラメーターはAssistant作成時にenum
で取り得る値を列挙したため、その中から最も相応しいものが選択されます。いずれにも合わないような雑すぎる指示にはアシスタントがちゃんとエラーを返してくれます。
参考記事
おわりに
今回はAzure OpenAI ServiceにてAssistants APIを利用してURジョブを実行してみました。これまでもFunction Callingは実装可能でしたが、Assistants APIによって関数の定義とThreadおよびRunによる状態管理が分かりやすくなったと思います。少々あいまいなプロンプトでも適切にパラメーターを自動抽出して関数を実行できるのでとても便利です。アイディア次第でさまざまな自動化に利用できますので是非試してみてください!