Azure Data Factory から Web Activityで Azure Durable Functionsを呼ぶ方法を記録しておきます。躓きポイントがあったので、そのメモです。
以前、Durable Functionsについてはこちらで書きました。
想定とつまづき
左のWeb1でオーケストレーター関数を呼び出して、ステータスを取得するためのURLを取得して、その後にそのURLに対して、完了するまでポーリングするような流れです。しかし、何度やってもWeb1で最終結果が返り、???状態でした。
これを受け取る想定(URL部分はlocalhostだが、これがFunctionsのHostになっている想定)。
$ curl -X POST http://localhost:7071/api/orchestrators/hello_orchestrator -H "Content-Type: application/json"
{
"id": "ecd9dd0ace804c61b04bd94f7da9fdfc",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
"sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/raiseEvent/{eventName}?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
"terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/terminate?reason={text}&taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
"rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/rewind?reason={text}&taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
"purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
"restartPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/restart?taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
"suspendPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/suspend?reason={text}&taskHub=TestHubName&connection=Storage&code=BMPYKiPU07w6kBO6ZZL8RofyK2SHMhMa-XqZuv3KtKvIAzFuFhlOrQ==",
"resumePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/ecd9dd0ace804c61b04bd94f7da9fdfc/resume?reason={text}&taskHub=TestHubName&connection=Storage&co 後略"
}
原因
原因は「非同期パターンを無効にする」(turnOffAsync)がOFFになっていたことでした。この場合は、HTTP Status が202だとHTTP Response HeaderのLocationに記載されているURLにポーリングをして最終結果を取得する動きになります。便利ですね。
最終パイプライン
最終的には「非同期パターンを無効にする」(turnOffAsync)をOFFにしたままシンプルな実装に帰着しました。
全体
Web Activity
全般
「タイムアウト」を2時間に、「再試行のサイクル間隔(秒)」を30に設定。
設定
@concat(concat(variables('endpoint_uri'), '?code='), variables('code'))
If condition
@and(
contains(activity('Invoke Durable Functions').output, 'runtimeStatus'),
or(
equals(activity('Invoke Durable Functions').output.runtimeStatus, 'Failed'),
equals(activity('Invoke Durable Functions').output.runtimeStatus, 'Terminated')
)
)
以下で取り得るステータス値を確認しました。
JSON定義
JSONでの定義です。
{
"name": "call_durable_function",
"properties": {
"activities": [
{
"name": "Invoke Durable Functions",
"type": "WebActivity",
"dependsOn": [],
"policy": {
"timeout": "0.02:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"method": "POST",
"url": {
"value": "@concat(concat(variables('endpoint_uri'), '?code='), variables('code'))",
"type": "Expression"
},
"body": {
"value": "@replace(replace('{\"source_file\": \"<source_file>\", \"target_file\": \"<target_file>\"}', '<source_file>', pipeline().parameters.source_file), '<target_file>', pipeline().parameters.target_file)",
"type": "Expression"
}
}
},
{
"name": "StatusCheck",
"type": "IfCondition",
"dependsOn": [
{
"activity": "Invoke Durable Functions",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@and(\n contains(activity('Invoke Durable Functions').output, 'runtimeStatus'),\n or(\n equals(activity('Invoke Durable Functions').output.runtimeStatus, 'Failed'),\n equals(activity('Invoke Durable Functions').output.runtimeStatus, 'Terminated')\n )\n)",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "Failed",
"type": "Fail",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"message": "JobTerminated",
"errorCode": "Job has failed or has been canceled"
}
}
]
}
}
],
"variables": {
"code": {
"type": "String",
"defaultValue": "<function key>"
},
"endpoint_uri": {
"type": "String",
"defaultValue": "https://<function host>/<path>"
}
},
"annotations": [],
"lastPublishTime": "2025-10-01T13:54:00Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}