はじめに
2020/06/24、どうやらPythonのDurable FunctionsがPublic Previewになったらしい・・・
Durable Functions now supports Python
ということで、ささっとローカルで試してみました。
検証環境
ドキュメントと GitHub をもとに、以下のような検証環境を用意しました。
- Windows 10 Pro 64bit
- Visual Studio Code
- Azure Functions拡張機能
- Node.js 12.18.0
- Azure Functions Core Tools 3.0.2630
- 3.0.2630 以上が必須
- Python 3.7.7
- Python 3.6 以降が必須
- Azure Storage Emulator 5.10.0.0
- ローカルデバッグで使用
プロジェクトの作成
Azure Functions拡張機能を使ってプロジェクトを作成します。
拡張機能のプロンプトに沿ってPythonの関数プロジェクトを作成します。
作成されたプロジェクトの「requirements.txt」を開き、Durable Functions用モジュール「azure-functions-durable>=1.0.0b6」を書き加えます。
VS Codeのターミナルを開き、プロジェクト内に作成されているPython仮想環境をactivateします。
Python仮想環境に「requirements.txt」でモジュールをインストールします。
> .\.venv\Scripts\activate
> python -m pip install -r requirements.txt
関数の作成
プロジェクトが作成されたら、Orchestrator、Activity、Clientの関数をそれぞれ作成していきます。
ドキュメントでは、Durable Functions用テンプレートを使用することになっていますが、現時点ではテンプレートがないため、「Http Trigger」のテンプレートで作って「_init_.py」、「functions.json」の中身を書き換えます。
Activity関数
「Http Trigger」のテンプレートで「durable-activity」の関数を作成し、「_init_.py」、「functions.json」を下記の内容で書き換えます。
内容としては、関数に渡された値の頭に「Hello」をつけて返すだけの簡単な処理です。
関数の実行状態を分かりやすくするため処理に2秒のWaitをかけ、ログに「Activity {name}」が表示されるようにしています。
import logging
import time
def main(name: str) -> str:
time.sleep(2)
logging.warning(f"Activity {name}")
return f'Hello {name}!'
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "name",
"type": "activityTrigger",
"direction": "in",
"datatype": "string"
}
],
"disabled": false
}
Orchestrator関数
「Http Trigger」のテンプレートで「durable-orchestrator」の関数を作成し、「_init_.py」、「functions.json」を下記の内容で書き換えます。
先ほど作成したActivity関数に「Tokyo」、「Seattle」、「London」の値を渡し、結果を配列に格納しています。
各呼び出しにyieldをつけて関数チェーンとして動作させています。
Activity関数の呼び出し方法は、関数がスネークケースになったぐらいでJavaScriptと変わりません。
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Activity関数の呼び出し
task1 = yield context.call_activity("durable-activity", "Tokyo")
task2 = yield context.call_activity("durable-activity", "Seattle")
task3 = yield context.call_activity("durable-activity", "London")
outputs = [task1, task2, task3]
return outputs
main = df.Orchestrator.create(orchestrator_function)
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"disabled": false
}
Client関数
「Http Trigger」のテンプレートで「durable-client」の関数を作成し、「_init_.py」、「functions.json」を下記の内容で書き換えます。
「client.start_new」でOrchestrator関数を指定して呼び出しをしています。
こちらも呼び出し方法は、JavaScriptと変わりませんが、バインドのtypeがJavaScriptでは「durableClient」なのに対し、「orchestrationClient」と変わっているところもあります。
import logging
from azure.durable_functions import DurableOrchestrationClient
import azure.functions as func
async def main(req: func.HttpRequest, starter: str, message):
logging.info(starter)
client = DurableOrchestrationClient(starter)
# Orchestratorの開始
instance_id = await client.start_new('durable-orchestrator')
response = client.create_check_status_response(req, instance_id)
message.set(response)
{
"scriptFile": "__init__.py",
"bindings": [
{
"authLevel": "anonymous",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"methods": [
"post",
"get"
]
},
{
"direction": "out",
"name": "message",
"type": "http"
},
{
"name": "starter",
"type": "orchestrationClient",
"direction": "in",
"datatype": "string"
}
]
}
関数のローカル実行
関数が作成できたので、ローカルで実行してみます。
Durable Functionsは仕組上Azure Storageが必要なため、ローカルだけで実行する場合はAzure Storage Emulatorを使用します。
Emulatorを使用する場合は、プロジェクトのrootにある「local.settings.json」を開いて「AzureWebJobsStorage」に「UseDevelopmentStorage=true」を代入し、Azure Storage Emulatorを開始します。
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "python"
}
}
F5キーでデバッグモードを開始します。
プロセスが起動するとClient関数のURLが表示されるので、Postman等のクライアントでアクセスします。
PostmanでGetリクエストをするとターミナルにログが流れ始め、レスポンスとして状態管理用URLが返されます。
表示されたログを確認するとWarningの黄文字が「Activity Tokyo」、「Activity Seattle」、「Activity London」の順でおおよそ2秒ごとに表示されていることがわかります。
関数チェーンが正しく機能していることがわかります。
レスポンスの「statusQueryGetUri」でOrchestratorの状態を確認すると、runtimeStatusが「Completed」となっており、outputとして「Hello Tokyo!」、「Hello Seattle!」、「Hello London!」の配列が得られました。
Fan-Out/Fan-In
せっかくなのでFan-Out/Fan-Inも試してみます。
Orchestrator関数を下記のように書き換えます。
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Activity関数の呼び出し
# task1 = yield context.call_activity("durable-activity", "Tokyo")
# task2 = yield context.call_activity("durable-activity", "Seattle")
# task3 = yield context.call_activity("durable-activity", "London")
# outputs = [task1, task2, task3]
tasks = []
tasks.append(context.call_activity("durable-activity", "Tokyo"))
tasks.append(context.call_activity("durable-activity", "Seattle"))
tasks.append(context.call_activity("durable-activity", "London"))
outputs = yield context.task_all(tasks)
return outputs
main = df.Orchestrator.create(orchestrator_function)
書き換えたものを実行してみました。
「Tokyo」、「Seattle」、「London」の順で登録したタスクが「Seattle」、「London」、「Tokyo」と順不同で処理されました。
ただ、処理はそれぞれ2秒ごとに発生しておりFan-Out/Fan-Inで期待される並列処理にはなっていないようでした。まだプレビュー段階なので今後に期待といったところでしょうか。
まとめ
Python版とはいえDurable Functionsなので、スクリプトのお作法はJavaScriptと変わっておらず、JavaScriptでの開発経験があれば難なく開発はできそうです。
リリースにも書かれているように、Pythonの場合は機械学習やデータ分析方面での活用が期待でき、サーバレスでデータの並列処理環境を構築するといったシナリオが新たに可能になったというのは活用方法として面白そうです。