はじめに
Azure FunctionsのHttpトリガーは決められた時間以内にレスポンスを返さないといけないため、その時間を超えてFunctionを実行したいときはDurable Functionsを利用します。
概要
HttpでFunctionの起動を受け付け、Http Status Codeは202 Acceptedを返却します。
Functionの実行を管理するURLの一覧がJsonでレスポンスされます。
そのURLを使って、Functionの実行状態を管理します。
例えば、Runningは実行中。Completedは完了。Failedは失敗。となります。
このURLを定期的に実行すれば、バッチの実行状態も確認することができるようになります。
実装
npm install durable-functions
でパッケージをインストールします。
durable functionでは、以下の3つの関数を利用します。
- "オーケストレーター関数" - 他の関数を調整するワークフローを記述します。
- "アクティビティ関数" - オーケストレーター関数によって呼び出され、作業を実行し、必要に応じて値を返します。
- クライアント関数 - オーケストレーター関数を開始する通常の Azure Functions。 今回は、HTTP によってトリガーされる関数を使用します。
① トリガーを定義
app.http('Warehouse_ImportEcStock', {
route: 'orchestrator/{orchestratorName}',
extraInputs: [df.input.durableClient()],
handler: sampleHttpStart,
methods: ['GET'],
authLevel: 'function'
})
Function名
route URL {} で動的に実行するFunctionを変更できる
handler このURLで実行されるハンドラー
methods 必要に応じてPOSTなども使う
authLevel functionにすることで利用するのに関数keyが必要になる
② ハンドラーを定義
①で設定したハンドラーを実装します。
「routeで設定したorchestratorName で実行していること」
「この時点ではresponseをすぐにreturnしていること」
の二つを知っておけばよい。
const sampleHttpStart: HttpHandler = async (request: HttpRequest, context: InvocationContext): Promise<HttpResponse> => {
const client = df.getClient(context)
const params = request.query.toString()
const instanceId: string = await client.startNew(request.params?.orchestratorName ?? '', { input: params })
context.log(`Started orchestration with ID = '${instanceId}'.`)
return client.createCheckStatusResponse(request, instanceId)
}
③ オーケストレーター定義
②で指定された実行するオーケストレーターをdfにハンドラーを指定して登録
ハンドラー(sampleOrchestrator)を実装する
ここで実行するfunctionを定義したり、output(レスポンス)を構築できる
ここで実行するfunctionはactivityとしてdfに登録する
オーケストレーターはあくまでもワークフローであること注意する
const activityName = 'runSample'
const sampleOrchestrator: OrchestrationHandler = function * (context: OrchestrationContext) {
const outputs: df.Task[] = []
const durableContext = context.df as any
outputs.push(yield context.df.callActivity(activityName, durableContext.input))
return outputs
}
// durable-functions オーケストレーターを追加
df.app.orchestration('sampleOrchestrator', sampleOrchestrator)
④ アクティビティは作業を実行し、必要に応じて値を返します
ここでは、パラメータを受け取り、sample関数を実行
実行完了後、APIのレスポンスに表示するJSONを定義する
今回はCompletedと表示した
const runSample: ActivityHandler = async (queryString: string): Promise<string> => {
const params = new URLSearchParams(queryString)
const hoge = params.get('hoge') ?? ''
await sample(hoge)
return 'Completed'
}
// 実際に実行される処理をdurable-functionsに登録
df.app.activity(activityName, { handler: runSample })
dfに登録と実装がいくつか必要になるが、こんなものなんだなぁの理解で大丈夫。
実行
http://localhost:7071/api/ 固定
routeに定義したorchestrator/{orchestratorName}
オーケーストレーター名を使って実行する
今回はGETパラメータを利用している。
即座にレスポンスが返ってくる
{
"id": "004e6eec52314f93a3e423c91062afe2",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/...",
"sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/.../raiseEvent/{eventName}?",
"terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/...",
"rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/...",
"purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/...",
"restartPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/...",
"suspendPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/...",
"resumePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/..."
}
statusQueryGetUriのURIでFunctionの実行状態を取得することができる.
terminatePostUriなども用意されているが、これらは実際にFunctionが終了するわけではなく、このJsonのステータスが変更になるだけであることに注意する
HTTPトリガーでレスポンスタイムに悩まされたときは、Durable Functionsの利用を検討するのも良いかも。