0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Data Factory から Durable functions 呼び出しでの注意点

Posted at

Azure Data Factory から Web Activityで Azure Durable Functionsを呼ぶ方法を記録しておきます。躓きポイントがあったので、そのメモです。
以前、Durable Functionsについてはこちらで書きました。

想定とつまづき

もともとはこんなパイプラインを作ろうかと考えていました。
image.png

左のWeb1でオーケストレーター関数を呼び出して、ステータスを取得するためのURLを取得して、その後にそのURLに対して、完了するまでポーリングするような流れです。しかし、何度やってもWeb1で最終結果が返り、???状態でした。
これを受け取る想定(URL部分はlocalhostだが、これがFunctionsのHostになっている想定)。

オーケストレーター関数呼び出し
$ curl -X POST   http://localhost:7071/api/orchestrators/hello_orchestrator -H "Content-Type: application/json"
{
    "id": "ecd9dd0ace804c61b04bd94f7da9fdfc",
    "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/raiseEvent/{eventName}?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/terminate?reason={text}&taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/rewind?reason={text}&taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "restartPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/restart?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "suspendPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/suspend?reason={text}&taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
    "resumePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/resume?reason={text}&taskHub=TestHubName&connection=Storage&co   後略"
}

原因

原因は「非同期パターンを無効にする」(turnOffAsync)がOFFになっていたことでした。この場合は、HTTP Status が202だとHTTP Response HeaderのLocationに記載されているURLにポーリングをして最終結果を取得する動きになります。便利ですね。
image.png

最終パイプライン

最終的には「非同期パターンを無効にする」(turnOffAsync)をOFFにしたままシンプルな実装に帰着しました。
image.png

全体

変数にFunctionsのURLとKeyを定義。
image.png

Web Activity

全般

「タイムアウト」を2時間に、「再試行のサイクル間隔(秒)」を30に設定。
image.png

設定

image.png
呼び出すURLの式。「本文」は特に汎用性ないようで省略

URL
@concat(concat(variables('endpoint_uri'), '?code='), variables('code'))

If condition

Trueでエラーを起こします。その条件は以下。
image.png

@and(
  contains(activity('Invoke Durable Functions').output, 'runtimeStatus'),
  or(
    equals(activity('Invoke Durable Functions').output.runtimeStatus, 'Failed'),
    equals(activity('Invoke Durable Functions').output.runtimeStatus, 'Terminated')
  )
)

以下で取り得るステータス値を確認しました。

JSON定義

JSONでの定義です。

pipiline
{
    "name": "call_durable_function",
    "properties": {
        "activities": [
            {
                "name": "Invoke Durable Functions",
                "type": "WebActivity",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.02:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "method": "POST",
                    "url": {
                        "value": "@concat(concat(variables('endpoint_uri'), '?code='), variables('code'))",
                        "type": "Expression"
                    },
                    "body": {
                        "value": "@replace(replace('{\"source_file\": \"<source_file>\", \"target_file\": \"<target_file>\"}', '<source_file>', pipeline().parameters.source_file), '<target_file>', pipeline().parameters.target_file)",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "StatusCheck",
                "type": "IfCondition",
                "dependsOn": [
                    {
                        "activity": "Invoke Durable Functions",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "expression": {
                        "value": "@and(\n  contains(activity('Invoke Durable Functions').output, 'runtimeStatus'),\n  or(\n    equals(activity('Invoke Durable Functions').output.runtimeStatus, 'Failed'),\n    equals(activity('Invoke Durable Functions').output.runtimeStatus, 'Terminated')\n  )\n)",
                        "type": "Expression"
                    },
                    "ifTrueActivities": [
                        {
                            "name": "Failed",
                            "type": "Fail",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "message": "JobTerminated",
                                "errorCode": "Job has failed or has been canceled"
                            }
                        }
                    ]
                }
            }
        ],
        "variables": {
            "code": {
                "type": "String",
                "defaultValue": "<function key>"
            },
            "endpoint_uri": {
                "type": "String",
                "defaultValue": "https://<function host>/<path>"
            }
        },
        "annotations": [],
        "lastPublishTime": "2025-10-01T13:54:00Z"
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?