センシンロボティクス開発部の黒田です。
弊社プロダクトの「SENSYN CORE」では、ドローンやUGVが取得してきた様々なデータをお客様が利用可能なフォーマットに加工したり、AIの推論処理にかけて異常検知をしたりと、重めのタスクを非同期で回す必要があるのですが、そのために一部Azure FunctionsのDurable Functionsを利用しています。
Durable Functions is 何?という話について詳細は公式ドキュメントに譲りますが、簡単に言うとサーバーレスなプラットフォームを利用する時に悩みがちなstate管理や複数関数のorchestrationをプログラムから制御することができ、しかも起動時間の限界も無制限にできるという大変イケてる機能です。
コーディングに慣れている人にとっては公式ドキュメントに記載されている次のコードを見るのが一番手っ取り早いです
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
F1 ~ F4が全て独立したAzure Functionになっており、それを順番に呼び出して関数チェーンを実現する処理が極めて直感的に記述できます。
もちろん、直列ではなく並列に実行することもできるので、多くのprimitiveなAzure Functionsを束ねて複雑なパイプライン処理を実現することも可能です。
これに加えて、「Durable Entity」という簡易な状態を管理するためのAzure Functionも実装することができるので、きっと誰もが一度は夢みたことがある(?)「ぼくのかんがえたさいきょうのサーバーレスオンリーアプリ」がさらに現実味を帯びてきたわけです。
ということで、今回はこのDurable Entityを組み込んだDurable Functionsで、簡単なカウンターとその監視アプリをTypeScriptで作ってみたいと思います。
カウンターアプリの実装
Durable Entityにカウントを保持させ、それをHttp TriggerからGETしたりカウントアップ・ダウンできるようなアプリを作ります。
想定仕様
- GET /api/counter で現在のカウントの取得
- パラメータcounter_idを指定してカウンターのインスタンスを切り替えれる(デフォルト:myCounter)
- POST /api/counter でカウントの操作
- パラメータcounter_idを指定してカウンターのインスタンスを切り替えれる(デフォルト:myCounter)
- パラメータoperationを指定して操作の種類を切り替えれる(デフォルト:plus)
- plus: インクリメント
- minus: デクリメント
- reset: 0にリセット
- get: 単純に現在の値を返却(後のOrchetratorで使用)
実装・デプロイする
VSCodeを使った手順が公式ドキュメントに記載されているので、こちらをベースにHttp Triggerな関数アプリを作成し、「CounterEntityClient」という名称にRenameします。
その上でindex.tsを以下のように実装します
import { AzureFunction, Context, HttpRequest } from "@azure/functions"
import * as df from "durable-functions";
async function httpRequestHandler(
context: Context, req: HttpRequest,
callback: (ctx: Context, r: HttpRequest) => unknown
) {
try {
const data = await callback(context, req);
context.res = {
body: {
ok: true,
data: data,
}
};
} catch (e) {
context.log(`Catched error: ${e.stack}`);
context.res = {
status: 500,
body: e.message
};
} finally {
context.done();
}
}
async function getCounter(context: Context, req: HttpRequest): Promise<any> {
context.log('Invoked Get Counter');
const client = df.getClient(context);
const counterId = req.query.counter_id ?? 'myCounter';
const entityId = new df.EntityId("CounterEntity", counterId);
const stateResponse = await client.readEntityState<number>(entityId);
if (!stateResponse.entityExists) {
return {
entityId: entityId.toString(),
value: -1,
};
}
return {
entityId: entityId.toString(),
value: stateResponse.entityState,
}
}
async function mutateCounter(context: Context, req: HttpRequest): Promise<boolean> {
context.log('Invoked Mutate Counter');
const client = df.getClient(context);
const counterId = req.query.counter_id ?? 'myCounter';
const entityId = new df.EntityId("CounterEntity", counterId);
const op = req.query.operation ?? 'plus';
await client.signalEntity(entityId, op);
return true;
}
// entry point
const httpTrigger: AzureFunction = async function (context: Context, req: HttpRequest): Promise<void> {
context.log(`HTTP trigger: ${req.method} ${req.url}`);
if (req.method === 'GET') {
return httpRequestHandler(context, req, getCounter);
} else if (req.method === 'POST') {
return httpRequestHandler(context, req, mutateCounter);
}
};
export default httpTrigger;
ちなみに function.json は以下のようになります
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"route": "counter",
"methods": [
"post",
"get"
]
},
{
"type": "http",
"direction": "out",
"name": "res"
},
{
"name": "starter",
"type": "orchestrationClient",
"direction": "in"
}
],
"scriptFile": "../dist/CounterEntityClient/index.js"
}
上記のコードから分かる通り、カウンターアプリはGETとPOSTの2つのmethodをサポートしており、GETされた場合はCounterEntityの現在の値を返却し、POSTの場合クエリパラメータに指定されたoperationの値に従ってstate操作を行います。
次に、本題のDurable Entityを実装します。
この関数アプリのディレクトリをコピーし「CounterEntity」という名称にRenameして作ります(私の環境ではVS CodeからEntity Triggerな関数アプリを作成することができませんでしたので手作業で作成)。
こちらもindex.tsとfunction.jsonを以下に記載します
import { AzureFunction } from "@azure/functions"
import * as df from "durable-functions";
import { IEntityFunctionContext } from "durable-functions/lib/src/ientityfunctioncontext";
const entityTrigger: AzureFunction = async function (context: IEntityFunctionContext<number>): Promise<void> {
context.log(`Entity trigger: ${context.df.operationName} ${context.df.entityId.toString()}`);
const currentValue = context.df.getState(() => 0) as number;
switch(context.df.operationName) {
case "plus":
context.df.setState(currentValue + 1);
break;
case "minus":
context.df.setState(currentValue - 1);
break;
case "reset":
context.df.setState(0);
break;
case "get":
// nothing
break;
default:
context.log(`Unknown operation : ${context.df.operationName}`);
return; // error
}
context.df.return(currentValue);
};
export default df.entity(entityTrigger);
{
"bindings": [
{
"name": "context",
"type": "entityTrigger",
"direction": "in"
}
],
"disabled": false,
"scriptFile": "../dist/CounterEntity/index.js"
}
最終的な構成は以下になります
durable_functions_trial
├── CounterEntity
│ ├── function.json
│ └── index.ts
├── CounterEntityClient
│ ├── function.json
│ └── index.ts
├── dist
├── host.json
├── local.settings.json
├── node_modules
├── package-lock.json
├── package.json
├── proxies.json
└── tsconfig.json
ここまでできたらVS CodeのAzure Extension機能からAzureにデプロイします。
試す
operationを切り替えながらHttp Triggerを呼び出してみます(エンドポイントはダミーです)
# get(not exists)
$ curl -s 'https://durable_functions_trial.azurewebsites.net/api/counter?code=xxxxxx' | jq
{
"ok": true,
"data": {
"entityId": "@counterentity@myCounter",
"value": -1
}
}
# plus
$ curl -s -X POST -d '{}' 'https://durable_functions_trial.azurewebsites.net/api/counter?code=xxxxxx&operation=plus' | jq
{
"ok": true,
"data": true
}
# confirm
$ curl -s 'https://durable_functions_trial.azurewebsites.net/api/counter?code=xxxxxx' | jq
{
"ok": true,
"data": {
"entityId": "@counterentity@myCounter",
"value": 1
}
}
# minus
$ curl -s -X POST -d '{}' 'https://durable_functions_trial.azurewebsites.net/api/counter?code=xxxxxx&operation=minus' | jq
{
"ok": true,
"data": true
}
# confirm
$ curl -s 'https://durable_functions_trial.azurewebsites.net/api/counter?code=xxxxxx' | jq
{
"ok": true,
"data": {
"entityId": "@counterentity@myCounter",
"value": 0
}
}
counter_idも切り替えて試してみます
# myCounter2
$ curl -s 'https://durable_functions_trial.azurewebsites.net/api/counter?code=xxxxxx&counter_id=myCounter2' | jq
{
"ok": true,
"data": {
"entityId": "@counterentity@myCounter2",
"value": -1
}
}
# plus: myCounter2
$ curl -s -X POST -d '{}' 'https://durable_functions_trial.azurewebsites.net/api/counter?code=xxxxxx&counter_id=myCounter2&operation=plus' | jq
{
"ok": true,
"data": true
}
# confirm: myCounter2
$ curl -s 'https://durable_functions_trial.azurewebsites.net/api/counter?code=xxxxxx&counter_id=myCounter' | jq
{
"ok": true,
"data": {
"entityId": "@counterentity@myCounter2",
"value": 1
}
}
非常にシンプルなstate管理ではありますが、Azure Functionsのみでサーバーレスなカウンターアプリが実装できました。
実装方法やデプロイ手順には慣れが必要な感じはありますが、慣れてしまうと開発者体験としては非常にイケてると思います。
カウンターの監視アプリ
さて、次はこのカウンターアプリの値を監視して、特定の条件が満たされたら何かする(通知とか)ようなアプリをDurable FunctionsのOrchestratorを使って実装したいと思います。
通常、このような監視アプリを作ろうと思うと、
- HttpRequestなどの特定のイベントをトリガーとしてプロセスを起動する
- 起動されたプロセスは監視対象を確認し、条件を満たしていれば後段の処理をトリガーする
- 条件を満たしていなければ、次の確認プロセスが一定時間後に起動するようスケジューリングし、自身は終了する
- 監視に関する状態や、現在の値を何らかのstorageに格納しておき、クライアントから参照できるようにする(要求に応じて)
といった処理・手続きが必要になるかと思いますが、これを一手に担ってくれるのはDurable FunctionsのOrchestrator機能です。
想定仕様
-
POST /api/orchestratorによって監視プロセスが開始される
- 監視プロセスはmyCounter, myCounter2の2つのカウンターの値を確認する
- 2つのカウンターがいずれも5以上の値になるまで5秒おきに確認を続ける
- 条件が満たされたら他システムへの通知を行う
-
Azure FunctionsのOrchestratorが提供するstatusQueryGetUriエンドポイントから監視プロセスの実行ステータスが確認できる
- 具体的には GET /runtime/webhooks/durabletask/instances/{instanceId} で確認できる
実装・デプロイする
OrchestratorをトリガーするためのClient関数としてOrchestratorClientを以下のように実装します
import { AzureFunction, Context, HttpRequest } from "@azure/functions"
import * as df from "durable-functions";
// entry point
const httpTrigger: AzureFunction = async function (context: Context, req: HttpRequest): Promise<void> {
context.log(`HTTP trigger: ${req.method} ${req.url}`);
const client = df.getClient(context);
const instanceId = await client.startNew('Orchestrator', null, req.body);
context.log(`Started orchestration with ID = '${instanceId}'.`);
context.res = client.createCheckStatusResponse(context.bindingData.req, instanceId);
};
export default httpTrigger;
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"route": "orchestrator",
"methods": [
"post"
]
},
{
"type": "http",
"direction": "out",
"name": "res"
},
{
"name": "starter",
"type": "orchestrationClient",
"direction": "in"
}
],
"scriptFile": "../dist/OrchestratorClient/index.js"
}
こちらからTriggerされるOrchestratorは次のように実装します
import * as df from "durable-functions";
import { IOrchestrationFunctionContext } from "durable-functions/lib/src/iorchestrationfunctioncontext";
import dayjs from 'dayjs';
const orchestrationTrigger = function* (context: IOrchestrationFunctionContext): Generator<unknown, unknown, number> {
const entityId1 = new df.EntityId("CounterEntity", "myCounter");
const entityId2 = new df.EntityId("CounterEntity", "myCounter2");
const outputs = [null, null];
let index = 0;
let value1 = -1;
let value2 = -1;
while (index < 100) { // continueAsNewの方が望ましいがとりあえずwhile
// 5秒ごとにentityを確認
const deadline = dayjs(context.df.currentUtcDateTime).add(5, 'second');
yield context.df.createTimer(deadline.toDate());
if (value1 < 5) {
context.log(`call Entity1 ${index}`);
value1 = yield context.df.callEntity(entityId1, 'get');
} else if (outputs[0] === null) {
context.log(`call Entity1 Done`);
outputs[0] = {
entityId: entityId1.toString(),
value: value1,
};
}
if (value2 < 5) {
context.log(`call Entity2 ${index}`);
value2 = yield context.df.callEntity(entityId2, 'get');
} else if (outputs[1] === null) {
context.log(`call Entity2 done`);
outputs[1] = {
entityId: entityId2.toString(),
value: value2,
};
}
if (outputs[0] !== null && outputs[1] !== null) break;
index++;
}
context.log(outputs);
// 本来はここで外部サービスなどを経由して通知をトリガーする想定
return outputs;
};
export default df.orchestrator(orchestrationTrigger);
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"scriptFile": "../dist/Orchestrator/index.js"
}
上記のように、異なるAzure Functionをyieldを使ってプログラミックに読み出すことができ、本来長時間に渡る処理を実行できないサーバーレス環境においても、上記のようなループ処理を特別なインフラの追加なしに実現できます。
上記のOrchestratorとOrchestratorClientはEntityを扱う関数と同じプロジェクトで作成しましのたで、最終的な構成は次のようになります
durable_functions_trial
├── CounterEntity
│ ├── function.json
│ └── index.ts
├── CounterEntityClient
│ ├── function.json
│ └── index.ts
├── Orchestrator
│ ├── function.json
│ └── index.ts
├── OrchestratorClient
│ ├── function.json
│ └── index.ts
├── dist
├── host.json
├── local.settings.json
├── node_modules
├── package-lock.json
├── package.json
├── proxies.json
└── tsconfig.json
ちなみにdayjsを使用する過程で、tsconfigをカスタマイズしましたのでそちらも記載します
{
"compilerOptions": {
"module": "commonjs",
"moduleResolution": "node",
"target": "ES2018",
"outDir": "dist",
"rootDir": ".",
"sourceMap": true,
"strict": false,
"esModuleInterop": true, // 追加
"allowSyntheticDefaultImports": true, // 追加
}
}
準備が整ったらカウンターアプリと同様、VS Codeからデプロイします。
試す
さて、これらの動きを試すには少々複雑な手順が必要になりますが、ターミナルを複数立ち上げて頑張ります
1. OrchestratorClient経由でOrchestratorを起動する
$ curl -s -X POST -d '{}' 'https://durable_functions_trial.azurewebsites.net/api/orchestrator?code=xxxxxx' | jq
{
"id": "dc078...",
"statusQueryGetUri": "https://durable_functions_trial.azurewebsites.net/runtime/webhooks/durabletask/instances/....",
"sendEventPostUri": "https://durable_functions_trial.azurewebsites.net/runtime/webhooks/durabletask/instances/....",
"terminatePostUri": "https://durable_functions_trial.azurewebsites.net/runtime/webhooks/durabletask/instances/....",
"rewindPostUri": "https://durable_functions_trial.azurewebsites.net/runtime/webhooks/durabletask/instances/....",
"purgeHistoryDeleteUri": "https://durable_functions_trial.azurewebsites.net/runtime/webhooks/durabletask/instances/....",
"restartPostUri": "https://durable_functions_trial.azurewebsites.net/runtime/webhooks/durabletask/instances/...."
}
2. responseにstatusQueryGetUriが含まれるので、それを控えておき、そのエンドポイントをcurlで定期的に呼び出してstatusを確認する
while true; do curl -s "https://durable_functions_trial.azurewebsites.net/runtime/webhooks/durabletask/instances/...." | jq; sleep 1; clear; done;
{
"name": "Orchestrator",
"instanceId": "dc078...",
"runtimeStatus": "Running",
"input": {},
"customStatus": null,
"output": null,
"createdTime": "2021-12-30T10:02:45Z",
"lastUpdatedTime": "2021-12-30T10:06:17Z"
}
3. CounterEntityClient経由でmyCounterおよびmyCounter2をインクリメントする
$ curl -s -X POST -d '{}' 'https://durable_functions_trial.azurewebsites.net/api/counter?code=xxxxxxxx&counter_id=myCounter' | jq
{
"ok": true,
"data": true
}
$ curl -s -X POST -d '{}' 'https://durable_functions_trial.azurewebsites.net/api/counter?code=xxxxxxxx&counter_id=myCounter2' | jq
{
"ok": true,
"data": true
}
4. 2つのカウンターを5回ずつインクリメントした後、Orchestratorがcompletedになることを確認する
{
"name": "Orchestrator",
"instanceId": "dc078....",
"runtimeStatus": "Completed",
"input": {},
"customStatus": null,
"output": [
{
"entityId": "@counterentity@myCounter",
"value": 5
},
{
"entityId": "@counterentity@myCounter2",
"value": 5
}
],
"createdTime": "2021-12-30T10:02:45Z",
"lastUpdatedTime": "2021-12-30T10:07:51Z"
}
できた。
なんとなく無駄に複雑な実装をした気がするけど気にしない。
注意点など
- Durable Entityはクライアントからの読み込みでは生成されず、あくまでoperationを実行するなどしてAzure Functionとして実行されてから初めて実行される
- 今回の例で言うと、 GET /api/counter では生成されず、 POST /api/counter を呼び出して初めて生成される
- Durable EntityはEntity関数呼び出し後即座に参照できるわけではなく、コールドスタートのように初回起動時は少し反映に時間差がある模様
- そのため、インクリメントを何回も呼び出しているのに、GETしてもカウンターの値が0に見えることがよくある
- 結果の整合性には十分注意して利用する必要がありそう
まとめなど
- Durable Functions最高
- 皆さまも良いサーバーレス新年をお迎えください