LoginSignup
7
5

Dify APIを使ったワークフロー実行するPythonスクリプト

Posted at

Pythonプログラムから、自分のローカルのDify APIを使ってワークフローを実行するスクリプトが動作してくれたので、メモをしておきます。

import requests
import json
import argparse

API_KEY = "app-XXXXXXXXXXXXXue"  # DifyのAPIキーを入れてください
BASE_URL = "http://localhost/v1"

def run_workflow(inputs, response_mode, user):
    url = f"{BASE_URL}/workflows/run"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    data = {
        "inputs": {"input_text": inputs["input_text"]},
        "response_mode": response_mode,
        "user": user
    }
    print(f"Sending request to {url} with data: {json.dumps(data, ensure_ascii=False)}")  # リクエストデータを出力
    response = requests.post(url, headers=headers, data=json.dumps(data))

    print(f"Received response with status code {response.status_code}")  # ステータスコードを出力

    if response.status_code == 200:
        if response_mode == "blocking":
            result = response.json()
            print(f"Response JSON: {result}")  # レスポンス全体を出力
            if "data" in result and "outputs" in result["data"] and "text" in result["data"]["outputs"]:
                return result["data"]["outputs"]["text"]
            else:
                print("Error: 'text' not found in the API response.")
        elif response_mode == "streaming":
            for chunk in response.iter_content(chunk_size=None):
                if chunk:
                    print(chunk.decode())
    else:
        print(f"Request failed with status code {response.status_code}")
        try:
            print(response.json())
        except json.JSONDecodeError:
            print(response.text)

def stop_workflow(task_id, user):
    url = f"{BASE_URL}/workflows/{task_id}/stop"
    headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
    data = {"user": user}
    response = requests.post(url, headers=headers, data=json.dumps(data))

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Request failed with status code {response.status_code}")

def get_parameters(user):
    url = f"{BASE_URL}/parameters?user={user}"
    headers = {"Authorization": f"Bearer {API_KEY}"}
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Request failed with status code {response.status_code}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Run workflow")
    parser.add_argument("--input", type=str, required=True, help="Input file path")
    parser.add_argument("--response_mode", type=str, help="Response mode (blocking or streaming)")
    parser.add_argument("--user", type=str, default="example_user", help="User identifier")

    args = parser.parse_args()

    with open(args.input, 'r', encoding='utf-8') as file:
        input_text = file.read().strip()

    inputs = {"input_text": input_text}
    response_mode = args.response_mode if args.response_mode else "blocking"
    user = args.user

    result = run_workflow(inputs, response_mode, user)
    if result:
        print(result)

以下、各部分について詳しく説明します。

インポートと定数の定義

import requests
import json
import argparse
  • requests: HTTPリクエストを送るためのライブラリ。
  • json: JSONデータを操作するためのライブラリ。
  • argparse: コマンドライン引数を解析するためのライブラリ。
API_KEY = "app-XXXXXXXXXXXXXue"  # DifyのAPIキーを入れてください
BASE_URL = "http://localhost/v1"
  • API_KEY: Dify APIを使用するための認証キー。
  • BASE_URL: APIのベースURL。

ワークフローの実行関数

def run_workflow(inputs, response_mode, user):
    url = f"{BASE_URL}/workflows/run"
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    data = {
        "inputs": {"input_text": inputs["input_text"]},
        "response_mode": response_mode,
        "user": user
    }
    print(f"Sending request to {url} with data: {json.dumps(data, ensure_ascii=False)}")  # リクエストデータを出力
    response = requests.post(url, headers=headers, data=json.dumps(data))

    print(f"Received response with status code {response.status_code}")  # ステータスコードを出力

    if response.status_code == 200:
        if response_mode == "blocking":
            result = response.json()
            print(f"Response JSON: {result}")  # レスポンス全体を出力
            if "data" in result and "outputs" in result["data"] and "text" in result["data"]["outputs"]:
                return result["data"]["outputs"]["text"]
            else:
                print("Error: 'text' not found in the API response.")
        elif response_mode == "streaming":
            for chunk in response.iter_content(chunk_size=None):
                if chunk:
                    print(chunk.decode())
    else:
        print(f"Request failed with status code {response.status_code}")
        try:
            print(response.json())
        except json.JSONDecodeError:
            print(response.text)
  • run_workflow関数は、Difyのワークフローを実行します。
  • url: ワークフローを実行するためのAPIエンドポイント。
  • headers: APIリクエストのヘッダー。認証キーとJSONコンテンツタイプが含まれます。
  • data: リクエストデータ。inputsには入力テキスト、response_modeにはレスポンスモード、userにはユーザーIDが含まれます。
  • requests.post: リクエストを送信します。
  • レスポンスのステータスコードに応じて、適切な処理を行います。

ワークフローの停止関数

def stop_workflow(task_id, user):
    url = f"{BASE_URL}/workflows/{task_id}/stop"
    headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
    data = {"user": user}
    response = requests.post(url, headers=headers, data=json.dumps(data))

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Request failed with status code {response.status_code}")
  • stop_workflow関数は、実行中のワークフローを停止します。
  • url: ワークフローを停止するためのAPIエンドポイント。
  • data: リクエストデータ。ユーザーIDが含まれます。
  • リクエストを送信し、レスポンスを処理します。

パラメータの取得関数

def get_parameters(user):
    url = f"{BASE_URL}/parameters?user={user}"
    headers = {"Authorization": f"Bearer {API_KEY}"}
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Request failed with status code {response.status_code}")
  • get_parameters関数は、APIからパラメータを取得します。
  • url: パラメータを取得するためのAPIエンドポイント。
  • リクエストを送信し、レスポンスを処理します。

メイン処理

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Run workflow")
    parser.add_argument("--input", type=str, required=True, help="Input file path")
    parser.add_argument("--response_mode", type=str, help="Response mode (blocking or streaming)")
    parser.add_argument("--user", type=str, default="example_user", help="User identifier")

    args = parser.parse_args()

    with open(args.input, 'r', encoding='utf-8') as file:
        input_text = file.read().strip()

    inputs = {"input_text": input_text}
    response_mode = args.response_mode if args.response_mode else "blocking"
    user = args.user

    result = run_workflow(inputs, response_mode, user)
    if result:
        print(result)
  • コマンドライン引数を解析します。
    • --input: 入力ファイルのパス。
    • --response_mode: レスポンスモード(指定がない場合は"blocking")。
    • --user: ユーザー識別子。
  • 指定された入力ファイルからテキストを読み込みます。
  • run_workflow関数を呼び出し、結果を表示します。

実行例

以下のコマンドでスクリプトを実行します:

python3 dify-call.py --input "/home/xxc/dify/DATA/input.txt" --user "user"

このスクリプトを使用することで、簡単にDify APIを利用してワークフローを実行できます。必要な情報を入力ファイルにまとめ、コマンドラインからスクリプトを実行するだけです。これにより、ワークフローの実行とレスポンスの取得が自動化され、効率的に作業を進めることができます。なお、レスポンスモードが指定されていない場合は、デフォルトで"blocking"になります。

7
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
5