2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Durable FunctionsをPythonで実装

Last updated at Posted at 2025-07-13

Azure Durable Functionsを試した時のメモです。以下の記事のこの絵のパターンがやりたくて、試してみました。理由は、LLMを使う少し長めのStatusを持ったFunctionsで、自動でStatus管理してくれるのを見たかったからです。
image.png

主に以下の記事から試しました。

Step

0. 前提

「サポートされている言語」にはPython3.7以上として書いていないのですが、「クイックスタート」には「Python バージョン 3.7、3.8、3.9、または 3.10 がインストールされている。」と書かれているので、Python 3.10で試しました。3.11以降で動くかは確認していないです。

種類 Version 備考
OS Ubuntu22.04.5 LTS WSL2で動かしています
Python 3.10.6 少し古いです
Poetry 2.1.3 仮想環境の管理に使用

Python パッケージ

種類 Version 備考
azure-functions 1.23.0
azure-functions-durable 1.3.2

1. プログラム実装

1.1. 初期設定

コマンドパレットからプロジェクト作成
image.png

「クイックスタート」と同じ選択
image.png

1.2. メインプログラム

クイックスタートと以下の点を変更しています。

  • パラメータから関数名を取得しない(するとエラーが起きたので)
  • カスタムステータスを設定
  • HTTP RequestのBodyを読み込む(ただPrintするだけで不使用)
  • Activityをもう1つ試す
  • Activityにスリープ処理を追加(ステータス変化を追うため)

プログラム全体です。

function_app.py
import time
import azure.functions as func
import azure.durable_functions as df

myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

# An HTTP-triggered function with a Durable Functions client binding
@myApp.route(route="orchestrators/hello_orchestrator")
#@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client: df.DurableOrchestrationClient):
#    function_name = req.route_params.get('functionName')
#    instance_id = await client.start_new(function_name)
    payload = req.get_json()
    instance_id = await client.start_new("hello_orchestrator", client_input=payload)
    response = client.create_check_status_response(req, instance_id)
    return response

# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
    payload = context.get_input()
    print(payload)
    context.set_custom_status("Started")  
    result1 = yield context.call_activity("hello", "Seattle")
    context.set_custom_status({"stage": 1, "progress": 33})
    result2 = yield context.call_activity("hello2", "Tokyo")
    context.set_custom_status({"stage": 2, "progress": 66})
    result3 = yield context.call_activity("hello", "London")
    context.set_custom_status("Completed")  

    return [result1, result2, result3]

# Activity
@myApp.activity_trigger(input_name="city")
def hello(city: str):
    time.sleep(10)
    return f"Hello {city}"

# Activity
@myApp.activity_trigger(input_name="city")
def hello2(city: str):
    time.sleep(10)
    return f"Good day {city}"

以下、個別の解説。

クライアント関数

処理のトリガーの受け取り口となる関数。クイックスタートの通りreq.route_params.get('functionName')を実装したらここでエラー。今回は関数固定なので動的に取得せずにclient.start_newに固定値hello_orchestratorを渡しています。

http_start
# An HTTP-triggered function with a Durable Functions client binding
@myApp.route(route="orchestrators/hello_orchestrator")
#@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client: df.DurableOrchestrationClient):
#    function_name = req.route_params.get('functionName')
#    instance_id = await client.start_new(function_name)
    payload = req.get_json()
    instance_id = await client.start_new("hello_orchestrator", client_input=payload)
    response = client.create_check_status_response(req, instance_id)
    return response

オーケストレーター関数

ここで実行する内容を書きます。set_custom_statusでカスタムステータスを設定。

hello_orchestrator
# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
    payload = context.get_input()
    print(payload)
    context.set_custom_status("Started")  
    result1 = yield context.call_activity("hello", "Seattle")
    context.set_custom_status({"stage": 1, "progress": 33})
    result2 = yield context.call_activity("hello2", "Tokyo")
    context.set_custom_status({"stage": 2, "progress": 66})
    result3 = yield context.call_activity("hello", "London")
    context.set_custom_status("Completed")  

    return [result1, result2, result3]

1.3. ストレージ エミュレーター

ストレージ エミュレーター設定のため、local.settings.json を変更

local.settings.json
{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "python"
  }
}

2. ローカルでのテスト

2.1. 起動

普通のFunctionと同様、コマンドパレットからAzurite を起動し、Functionsもfunc host start -wで起動

$ func host start -w
Found Python version 3.10.12 (python3).

Azure Functions Core Tools
Core Tools Version:       4.0.6821 Commit hash: N/A +c09a2033faa7ecf51b3773308283af0ca9a99f83 (64-bit)
Function Runtime Version: 4.1036.1.23224

[2025-07-13T06:32:14.029Z] Worker process started and initialized.

Functions:

        http_start:  http://localhost:7071/api/orchestrators/hello_orchestrator

        hello: activityTrigger

        hello2: activityTrigger

        hello_orchestrator: orchestrationTrigger

For detailed output, run func with --verbose flag.
[2025-07-13T06:32:18.975Z] Host lock lease acquired by instance ID '00000000000000000000000063AA5621'.

2.2. 実行

Curlでhttp://localhost:7071/api/orchestrators/hello_orchestratorにリクエスト投げると以下のResonseが返ります。これで実行完了です。JSONは見やすいように改行など整形しています(以下同じ)。

Response
$ curl -X POST   http://localhost:7071/api/orchestrators/hello_orchestrator   -H "Content-Type: application/json"   -d '{
    "city": "Tokyo",
    "count": 3
  }'
{
    "id": "ecd9dd0ace804c61b04bd94f7da9fdfc",
    "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/raiseEvent/{eventName}?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/terminate?reason={text}&taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/rewind?reason={text}&taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "restartPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/restart?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "suspendPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/suspend?reason={text}&taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "resumePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/resume?reason={text}&taskHub=TestHubName&connection=Storage&co   後略"
}

statusQueryGetUriの値のURLにブラウザでアクセスします。何回かアクセスするとステータスが遷移していくのがわかります。

開始時
{
    "name": "hello_orchestrator",
    "instanceId": "ecd9dd0ace804c61b04bd94f7da9fdfc",
    "runtimeStatus": "Running",
    "input": "{\"city\": \"Tokyo\", \"count\": 3}",
    "customStatus": "Started",
    "output": null,
    "createdTime": "2025-07-13T06:35:23Z",
    "lastUpdatedTime": "2025-07-13T06:35:23Z"
}
途中(2つ目実行中)
{
    "name": "hello_orchestrator",
    "instanceId": "ecd9dd0ace804c61b04bd94f7da9fdfc",
    "runtimeStatus": "Running",
    "input": "{\"city\": \"Tokyo\", \"count\": 3}",
    "customStatus": {
        "stage": 1,
        "progress": 33
    },
    "output": null,
    "createdTime": "2025-07-13T06:44:19Z",
    "lastUpdatedTime": "2025-07-13T06:44:30Z"
}
完了時
{
    "name": "hello_orchestrator",
    "instanceId": "ecd9dd0ace804c61b04bd94f7da9fdfc",
    "runtimeStatus": "Completed",
    "input": "{\"city\": \"Tokyo\", \"count\": 3}",
    "customStatus": "Completed",
    "output": [
        "Hello Seattle",
        "Good day Tokyo",
        "Hello London"
    ],
    "createdTime": "2025-07-13T06:44:19Z",
    "lastUpdatedTime": "2025-07-13T06:44:50Z"
}

多分、動き同じなので、今回はデプロイまでは検証しないです。

おまけ

Pythonファイルの分割

ブループリントを使って、Pythonファイルの分割が可能です。実務上、構造化のためには分けるのが必須になると思います。
こんな構造で分けます。

project_root/
├─ function_app.py              # 最小(登録だけ)
├─ clients/
│  └─ client_001.py             # クライアント関数1
├─ host.json
└─ requirements.txt
function_app.py
import azure.functions as func
import azure.durable_functions as df

from clients.client_001 import bp as client001_bp

# Durable Functions 用のアプリ(DFApp)
app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

# 別ファイルの Blueprint を登録するだけ
app.register_functions(client001_bp)
clients/client_001.py
import azure.functions as func
import azure.durable_functions as df

# Durable 用 Blueprint
bp = df.Blueprint()

@bp.route(route="orchestrators/hello_orchestrator")
@bp.durable_client_input(client_name="client")
async def client_001(req: func.HttpRequest, client: df.DurableOrchestrationClient):
    try:
        payload = req.get_json()
    except ValueError:
        payload = None

    instance_id = await client.start_new("hello_orchestrator", None, payload)
    # 標準のチェックステータス応答を返す
    return client.create_check_status_response(req, instance_id)

# Orchestrator
@bp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
    payload = context.get_input()
    print(payload) # 確認
    result1 = yield context.call_activity("hello", "Seattle")
    return result1

# Activity
@bp.activity_trigger(input_name="city")
def hello(city: str):
    return f"Hello {city}"

非同期関数

非同期でActivity 関数を定義してみます。先ほどのブループリントの続きです。

client_001.py
import azure.functions as func
import azure.durable_functions as df

# Durable 用 Blueprint
bp = df.Blueprint()

@bp.route(route="orchestrators/hello_orchestrator")
@bp.durable_client_input(client_name="client")
async def client_001(req: func.HttpRequest, client: df.DurableOrchestrationClient):
    instance_id = await client.start_new("hello_orchestrator", None, 1)
    # 標準のチェックステータス応答を返す
    return client.create_check_status_response(req, instance_id)

# Orchestrator
@bp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
    num = context.get_input()
    print(num)    
    a_out = yield context.call_activity("act_one", num)
    b_out = yield context.call_activity("act_one", a_out)

    return {"A": a_out, "B": b_out}

# Async Activity
@bp.activity_trigger(input_name="num")
async def act_one(num: int):
    return await act_two(num) 

async def act_two(num: int):
    return num + 2

実行後のステータス取得すると以下の結果。非同期呼び出した結果"output":{"A":3,"B":5}が出ています。

{"name":"hello_orchestrator","instanceId":"5b2f7abad885488b862d32c0e79df95a",
"runtimeStatus":"Completed","input":"1","customStatus":null,
"output":{"A":3,"B":5},"createdTime":"2025-08-31T01:45:56Z","lastUpdatedTime":"2025-08-31T01:45:57Z"}

注意点

いくつかはまった点・調べた点です。

  • クライアント関数を増やした場合は、Functionsの再起動が必要(自動反映されるかと勘違いし、10分くらい試していました)
  • Activity関数のinput_nameはアンダーバーを入れるとエラー「The 'activity_read_source' function is in error: The binding name source_file is invalid. Please assign a valid name to the binding.」。ただ、いずれ直るかも。Please support the underscore character as a binding name
  • Activity関数の入出力はJSON化できないとNG。dict形式で持っても値がインスタンスのオブジェクトだとエラー発生。これは、入出力をストレージに保存しているから、らしい。

オーケストレーター関数は長時間実行される可能性があります。 "オーケストレーション インスタンス" の全体的な継続時間は、秒、日、月の単位になる場合、またはいつまでも終了しない場合があります。

  • ローカルテストでfunc host start時に以下のエラーが出て起動できず。azuriteのcacheを消すと成功。
[2025-09-21T02:50:11.351Z] There was an error performing a read operation on the Blob Storage Secret Repository.
[2025-09-21T02:50:11.352Z] Azure.Storage.Blobs: Service request failed.
[2025-09-21T02:50:11.352Z] Status: 500 (Internal Server Error)
rm __azurite_db_*.json
rm -rf __blobstorage__ __queuestorage__
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?