2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

TROCCO®Advent Calendar 2024

Day 5

TROCCO REST APIをPythonで操作:GitHub Copilot で試してみた結果

Last updated at Posted at 2024-12-04

概要

本記事では、GitHub Copilot を使用して TROCCO REST API を Python で実行する手順を紹介します。Web API に対する理解が浅いと、API を実行するプログラムの開発で苦戦することがあります。そこで、AI の力を借りてこれらの作業を効率化できないかと考えました。本記事では以下の3つの手順を実施します。

  1. GitHub Copilot Chat in GitHub を使用してクラスを記述
  2. TROCCO 上でジョブの作成とトークンの取得
  3. ノートブック型 Python 環境(Databricks)から TROCCO REST API を実行
最終的なコードはこちら
import time
import requests
from typing import Optional

class TroccoAPIClient:
    """Trocco APIクライアントクラス。

    Args:
        api_key (str): Trocco APIのAPIキー。
        wait_time (int, optional): リクエスト間の待機時間(秒)。デフォルトは30秒。
        max_wait_time (int, optional): 最大待機時間(秒)。デフォルトは3600秒(60分)。

    Attributes:
        api_key (str): Trocco APIのAPIキー。
        base_url (str): Trocco APIのベースURL。
        headers (dict): リクエストヘッダー。
        wait_time (int): リクエスト間の待機時間(秒)。
        max_wait_time (int): 最大待機時間(秒)。
    """

    def __init__(self, api_key: str, wait_time: int = 30, max_wait_time: int = 3600) -> None:
        self.api_key = api_key
        self.base_url = "https://trocco.io/api"
        self.headers = {
            "Authorization": f"Token {self.api_key}",
            "Content-Type": "application/json"
        }
        self.wait_time = wait_time
        self.max_wait_time = max_wait_time

    def post_job(self, payload: dict) -> Optional[dict]:
        """ジョブを作成する。

        Args:
            payload (dict): ジョブ作成に必要なデータ。

        Returns:
            Optional[dict]: APIからのレスポンス。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs"
        try:
            response = requests.post(url, json=payload, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def get_job(self, job_id: str) -> Optional[dict]:
        """特定のジョブの詳細を取得する。

        Args:
            job_id (str): 取得したいジョブのID。

        Returns:
            Optional[dict]: ジョブの詳細。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs/{job_id}"
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def get_jobs(self, params: dict = {}) -> Optional[dict]:
        """ジョブの一覧を取得する。

        Args:
            params (dict, optional): クエリパラメータ。

        Returns:
            Optional[dict]: ジョブの一覧。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs"
        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def execute_job_and_wait(self, payload: dict) -> Optional[dict]:
        """ジョブを作成し、完了するまで待機して結果を取得する。

        Args:
            payload (dict): ジョブ作成に必要なデータ。

        Returns:
            Optional[dict]: 最終的なジョブの結果。失敗した場合はNone。
        """
        job = self.post_job(payload)
        if not job:
            return None

        job_id = job.get("id")
        if not job_id:
            print("ジョブIDを取得できませんでした。")
            return None

        start_time = time.time()
        while True:
            job_status = self.get_job(job_id)
            if job_status:
                status = job_status.get("status")
                if status == "succeeded":
                    print("ジョブが成功しました。")
                    return job_status
                elif status == "error":
                    print("ジョブが失敗しました。")
                    return job_status
                else:
                    print(f"ジョブのステータス: {status}。待機します。")
            else:
                print("ジョブステータスの取得に失敗しました。リトライします。")

            if time.time() - start_time > self.max_wait_time:
                print("最大待機時間を超えました。")
                return None

            time.sleep(self.wait_time)
# TroccoAPIClientのインスタンスを作成
api_key = "vV5vN6sCNivahnSYPYzzzzzzzzzzzz"  # ここにAPIキーを入力してください
client = TroccoAPIClient(api_key)

# ジョブ作成のためのペイロードを定義
payload = {
    "job_definition_id": "287317"
}

# ジョブを実行し、完了まで待機
result = client.execute_job_and_wait(payload)

# 結果を表示
if result:
    print("ジョブの結果:")
    print(result)
else:
    print("ジョブの取得に失敗しました。")

TROCCO REST API を利用するメリットとして、外部のワークフロー機能を活用できる点があります。TROCCO にもワークフロー機能はありますが、場合によっては外部のワークフロー機能から実行したいことがあります。本記事を参考に、ユースケースに応じた使い分けを実施し、TROCCO をより活用できるようになれば幸いです。

また、 GitHub Copilot Chat in GitHub 上でコードを生成しておりますが、 WEB 検索機能をもつ生成 AI サービスである ChatGPT や Microsoft Copilot でも同様にコードを生成できるはずです。出力されるコードを比較することで生成 AI サービスの特性を理解することができるかもしれません。

コードの作成と実行

1. GitHub Copilot Chat in GitHub を使用してクラスを記述

1-1. コードに含める API の特定

ジョブ実行に関連する以下の3つの API をコードに含めることにしました。

  1. trocco API - 転送ジョブ実行
  2. trocco API - 転送ジョブ実行結果取得
  3. trocco API - 転送ジョブ一覧取得

image.png

引用元:trocco API - 転送ジョブ実行

1-2. GitHub Copilot Chat in GitHub でコードを作成

以下の2つのプロンプトを使用して、コードを生成しました。

# これから提示するリンク先の URL の API を、制約を満たすような Python のクラスと実行コードとして記述してください。

## 制約

- 認証情報の設定、リクエストヘッダー、必要なパラメータなどを含める。
- Python の `requests` ライブラリを使用する。
- API エンドポイントにリクエストを送信し、レスポンスを表示する。
- Jupyter のノートブック環境から実行することを前提とする。
- 実行する API が非同期処理の場合は、実行と実行結果を取得するメソッドを追加すること。
- 待機時間(デフォルト30秒)と最大待機時間(デフォルト60分)をパラメータで指定できるようにする。
- 一時的なエラーが発生した場合に自動的にリトライすること。
- ベース URL は https://trocco.io/api とする。
- 認証方法は、`Authorization: Token {{API KEY}}` と指定する。
- Google Style の Docstring とデータ型ヒントを記述すること。
## リンク先

- https://documents.trocco.io/apidocs/post-job
- https://documents.trocco.io/apidocs/get-job
- https://documents.trocco.io/apidocs/get-jobs

image.png

1-3. 作成されたコードをコピー

コードはこちら
import time
import requests
from typing import Optional

class TroccoAPIClient:
    """Trocco APIクライアントクラス。

    Args:
        api_key (str): Trocco APIのAPIキー。
        wait_time (int, optional): リクエスト間の待機時間(秒)。デフォルトは30秒。
        max_wait_time (int, optional): 最大待機時間(秒)。デフォルトは3600秒(60分)。

    Attributes:
        api_key (str): Trocco APIのAPIキー。
        base_url (str): Trocco APIのベースURL。
        headers (dict): リクエストヘッダー。
        wait_time (int): リクエスト間の待機時間(秒)。
        max_wait_time (int): 最大待機時間(秒)。
    """

    def __init__(self, api_key: str, wait_time: int = 30, max_wait_time: int = 3600) -> None:
        self.api_key = api_key
        self.base_url = "https://api.trocco.io"
        self.headers = {
            "Authorization": f"Token {self.api_key}",
            "Content-Type": "application/json"
        }
        self.wait_time = wait_time
        self.max_wait_time = max_wait_time

    def post_job(self, payload: dict) -> Optional[dict]:
        """ジョブを作成する。

        Args:
            payload (dict): ジョブ作成に必要なデータ。

        Returns:
            Optional[dict]: APIからのレスポンス。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs"
        try:
            response = requests.post(url, json=payload, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def get_job(self, job_id: str) -> Optional[dict]:
        """特定のジョブの詳細を取得する。

        Args:
            job_id (str): 取得したいジョブのID。

        Returns:
            Optional[dict]: ジョブの詳細。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs/{job_id}"
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def get_jobs(self, params: dict = {}) -> Optional[dict]:
        """ジョブの一覧を取得する。

        Args:
            params (dict, optional): クエリパラメータ。

        Returns:
            Optional[dict]: ジョブの一覧。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs"
        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def execute_job_and_wait(self, payload: dict) -> Optional[dict]:
        """ジョブを作成し、完了するまで待機して結果を取得する。

        Args:
            payload (dict): ジョブ作成に必要なデータ。

        Returns:
            Optional[dict]: 最終的なジョブの結果。失敗した場合はNone。
        """
        job = self.post_job(payload)
        if not job:
            return None

        job_id = job.get("id")
        if not job_id:
            print("ジョブIDを取得できませんでした。")
            return None

        start_time = time.time()
        while True:
            job_status = self.get_job(job_id)
            if job_status:
                status = job_status.get("status")
                if status == "success":
                    print("ジョブが成功しました。")
                    return job_status
                elif status == "failure":
                    print("ジョブが失敗しました。")
                    return job_status
                else:
                    print(f"ジョブのステータス: {status}。待機します。")
            else:
                print("ジョブステータスの取得に失敗しました。リトライします。")

            if time.time() - start_time > self.max_wait_time:
                print("最大待機時間を超えました。")
                return None

            time.sleep(self.wait_time)
# TroccoAPIClientのインスタンスを作成
api_key = "YOUR_API_KEY"  # ここにAPIキーを入力してください
client = TroccoAPIClient(api_key)

# ジョブ作成のためのペイロードを定義
payload = {
    # 必要なパラメータをここに記述します
    # 例:
    # "job_type": "your_job_type",
    # "config": { ... }
}

# ジョブを実行し、完了まで待機
result = client.execute_job_and_wait(payload)

# 結果を表示
if result:
    print("ジョブの結果:")
    print(result)
else:
    print("ジョブの取得に失敗しました。")

2. TROCCO 上でジョブの作成とトークンの取得

2-1. TROCCO で API で実行するジョブの作成と job_definition_id の取得

job_definition_id は、ジョブを TROCCO 上で表示した際の URL、またはジョブ名の横から取得できます。

image.png

2-2. TROCCO API KEY の取得

image.png

3. ノートブック型 Python 環境(Databricks)から TROCCO REST API を実行

3-1. コードを貼り付け

image.png

3-2. コードの検証と実行

api_key に TROCCO API KEY を、payload 変数に job_definition_id をキーとして取得済みの job_definition_id を設定し、実行します。

# TroccoAPIClientのインスタンスを作成
api_key = "vV5vN6sCNivahnSYPYzzzzzzzzzzzz"  # ここにAPIキーを入力してください
client = TroccoAPIClient(api_key)

# ジョブ作成のためのペイロードを定義
payload = {
    "job_definition_id": "287317"
}

# ジョブを実行し、完了まで待機
result = client.execute_job_and_wait(payload)

# 結果を表示
if result:
    print("ジョブの結果:")
    print(result)
else:
    print("ジョブの取得に失敗しました。")

image.png

ここでエラーもなく実行できればよかったのですが、以下の修正が必要でした。

  1. URL が https://api.trocco.io となっていたため、https://trocco.io/api に修正
  2. 成功のステータスが success となっていたため、succeeded に修正 *1
  3. エラーのステータスが failure となっていたため、error に修正 *1

*1 ジョブのステータスはドキュメントで以下のように記載されています。

image.png

引用元:trocco API - 転送ジョブ実行結果取得

修整後のコードはこちら
import time
import requests
from typing import Optional

class TroccoAPIClient:
    """Trocco APIクライアントクラス。

    Args:
        api_key (str): Trocco APIのAPIキー。
        wait_time (int, optional): リクエスト間の待機時間(秒)。デフォルトは30秒。
        max_wait_time (int, optional): 最大待機時間(秒)。デフォルトは3600秒(60分)。

    Attributes:
        api_key (str): Trocco APIのAPIキー。
        base_url (str): Trocco APIのベースURL。
        headers (dict): リクエストヘッダー。
        wait_time (int): リクエスト間の待機時間(秒)。
        max_wait_time (int): 最大待機時間(秒)。
    """

    def __init__(self, api_key: str, wait_time: int = 30, max_wait_time: int = 3600) -> None:
        self.api_key = api_key
        self.base_url = "https://trocco.io/api"
        self.headers = {
            "Authorization": f"Token {self.api_key}",
            "Content-Type": "application/json"
        }
        self.wait_time = wait_time
        self.max_wait_time = max_wait_time

    def post_job(self, payload: dict) -> Optional[dict]:
        """ジョブを作成する。

        Args:
            payload (dict): ジョブ作成に必要なデータ。

        Returns:
            Optional[dict]: APIからのレスポンス。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs"
        try:
            response = requests.post(url, json=payload, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def get_job(self, job_id: str) -> Optional[dict]:
        """特定のジョブの詳細を取得する。

        Args:
            job_id (str): 取得したいジョブのID。

        Returns:
            Optional[dict]: ジョブの詳細。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs/{job_id}"
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def get_jobs(self, params: dict = {}) -> Optional[dict]:
        """ジョブの一覧を取得する。

        Args:
            params (dict, optional): クエリパラメータ。

        Returns:
            Optional[dict]: ジョブの一覧。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs"
        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def execute_job_and_wait(self, payload: dict) -> Optional[dict]:
        """ジョブを作成し、完了するまで待機して結果を取得する。

        Args:
            payload (dict): ジョブ作成に必要なデータ。

        Returns:
            Optional[dict]: 最終的なジョブの結果。失敗した場合はNone。
        """
        job = self.post_job(payload)
        if not job:
            return None

        job_id = job.get("id")
        if not job_id:
            print("ジョブIDを取得できませんでした。")
            return None

        start_time = time.time()
        while True:
            job_status = self.get_job(job_id)
            if job_status:
                status = job_status.get("status")
                if status == "succeeded":
                    print("ジョブが成功しました。")
                    return job_status
                elif status == "error":
                    print("ジョブが失敗しました。")
                    return job_status
                else:
                    print(f"ジョブのステータス: {status}。待機します。")
            else:
                print("ジョブステータスの取得に失敗しました。リトライします。")

            if time.time() - start_time > self.max_wait_time:
                print("最大待機時間を超えました。")
                return None

            time.sleep(self.wait_time)

3-3. 実行結果を確認

転送ジョブが成功したことを確認します。

image.png

image.png

まとめ

本記事では、TROCCO REST API を Python から実行する手順を紹介しました。AI が提示したコードが必ずしも正確ではないことを伝えるため、あえて手動で修正する手順を含めました。AI にエラーのないコードを生成してもらいたい場合、実装してもらう機能を減らすことで、期待通りに動作する可能性が高まります。GitHub Copilot を活用しつつ、必要に応じて手動での修正を加えることで、効率的に開発を進めることができます。AI ツールと適切に連携することで、開発効率を向上させる一助になれば幸いです。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?