LoginSignup
7
7

More than 3 years have passed since last update.

はじめに

2020/06/24、どうやらPythonのDurable FunctionsがPublic Previewになったらしい・・・

Durable Functions now supports Python

ということで、ささっとローカルで試してみました。

検証環境

ドキュメントGitHub をもとに、以下のような検証環境を用意しました。

  • Windows 10 Pro 64bit
  • Visual Studio Code
  • Azure Functions拡張機能
  • Node.js 12.18.0
  • Azure Functions Core Tools 3.0.2630
    • 3.0.2630 以上が必須
  • Python 3.7.7
    • Python 3.6 以降が必須
  • Azure Storage Emulator 5.10.0.0
    • ローカルデバッグで使用

プロジェクトの作成

Azure Functions拡張機能を使ってプロジェクトを作成します。
拡張機能のプロンプトに沿ってPythonの関数プロジェクトを作成します。
image.png

作成されたプロジェクトの「requirements.txt」を開き、Durable Functions用モジュール「azure-functions-durable>=1.0.0b6」を書き加えます。
image.png

VS Codeのターミナルを開き、プロジェクト内に作成されているPython仮想環境をactivateします。
Python仮想環境に「requirements.txt」でモジュールをインストールします。

> .\.venv\Scripts\activate
> python -m pip install -r requirements.txt

image.png

関数の作成

プロジェクトが作成されたら、Orchestrator、Activity、Clientの関数をそれぞれ作成していきます。
ドキュメントでは、Durable Functions用テンプレートを使用することになっていますが、現時点ではテンプレートがないため、「Http Trigger」のテンプレートで作って「_init_.py」、「functions.json」の中身を書き換えます。

image.png

Activity関数

「Http Trigger」のテンプレートで「durable-activity」の関数を作成し、「_init_.py」、「functions.json」を下記の内容で書き換えます。
内容としては、関数に渡された値の頭に「Hello」をつけて返すだけの簡単な処理です。
関数の実行状態を分かりやすくするため処理に2秒のWaitをかけ、ログに「Activity {name}」が表示されるようにしています。

_init_.py
import logging
import time

def main(name: str) -> str:

    time.sleep(2)

    logging.warning(f"Activity {name}")
    return f'Hello {name}!'
functions.json
{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "name",
      "type": "activityTrigger",
      "direction": "in",
      "datatype": "string"
    }
  ],
  "disabled": false
}

Orchestrator関数

「Http Trigger」のテンプレートで「durable-orchestrator」の関数を作成し、「_init_.py」、「functions.json」を下記の内容で書き換えます。
先ほど作成したActivity関数に「Tokyo」、「Seattle」、「London」の値を渡し、結果を配列に格納しています。
各呼び出しにyieldをつけて関数チェーンとして動作させています。
Activity関数の呼び出し方法は、関数がスネークケースになったぐらいでJavaScriptと変わりません。

_init_.py
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    # Activity関数の呼び出し
    task1 = yield context.call_activity("durable-activity", "Tokyo")
    task2 = yield context.call_activity("durable-activity", "Seattle")
    task3 = yield context.call_activity("durable-activity", "London")

    outputs = [task1, task2, task3]
    return outputs


main = df.Orchestrator.create(orchestrator_function)
functions.json
{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "context",
      "type": "orchestrationTrigger",
      "direction": "in"
    }
  ],
  "disabled": false
}

Client関数

「Http Trigger」のテンプレートで「durable-client」の関数を作成し、「_init_.py」、「functions.json」を下記の内容で書き換えます。
「client.start_new」でOrchestrator関数を指定して呼び出しをしています。
こちらも呼び出し方法は、JavaScriptと変わりませんが、バインドのtypeがJavaScriptでは「durableClient」なのに対し、「orchestrationClient」と変わっているところもあります。

_init_.py
import logging

from azure.durable_functions import DurableOrchestrationClient
import azure.functions as func

async def main(req: func.HttpRequest, starter: str, message):

    logging.info(starter)
    client = DurableOrchestrationClient(starter)

    # Orchestratorの開始
    instance_id = await client.start_new('durable-orchestrator')
    response = client.create_check_status_response(req, instance_id)
    message.set(response)
functions.json
{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "anonymous",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": [
        "post",
        "get"
      ]
    },
    {
      "direction": "out",
      "name": "message",
      "type": "http"
    },
    {
      "name": "starter",
      "type": "orchestrationClient",
      "direction": "in",
      "datatype": "string"
    }
  ]
}

関数のローカル実行

関数が作成できたので、ローカルで実行してみます。
Durable Functionsは仕組上Azure Storageが必要なため、ローカルだけで実行する場合はAzure Storage Emulatorを使用します。
Emulatorを使用する場合は、プロジェクトのrootにある「local.settings.json」を開いて「AzureWebJobsStorage」に「UseDevelopmentStorage=true」を代入し、Azure Storage Emulatorを開始します。

local.settings.json
{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "python"
  }
}

image.png

F5キーでデバッグモードを開始します。
プロセスが起動するとClient関数のURLが表示されるので、Postman等のクライアントでアクセスします。
image.png

PostmanでGetリクエストをするとターミナルにログが流れ始め、レスポンスとして状態管理用URLが返されます。
image.png

表示されたログを確認するとWarningの黄文字が「Activity Tokyo」、「Activity Seattle」、「Activity London」の順でおおよそ2秒ごとに表示されていることがわかります。
関数チェーンが正しく機能していることがわかります。
image.png

レスポンスの「statusQueryGetUri」でOrchestratorの状態を確認すると、runtimeStatusが「Completed」となっており、outputとして「Hello Tokyo!」、「Hello Seattle!」、「Hello London!」の配列が得られました。
image.png

Fan-Out/Fan-In

せっかくなのでFan-Out/Fan-Inも試してみます。
Orchestrator関数を下記のように書き換えます。

_init_.py
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    # Activity関数の呼び出し
    # task1 = yield context.call_activity("durable-activity", "Tokyo")
    # task2 = yield context.call_activity("durable-activity", "Seattle")
    # task3 = yield context.call_activity("durable-activity", "London")

    # outputs = [task1, task2, task3]

    tasks = []
    tasks.append(context.call_activity("durable-activity", "Tokyo"))
    tasks.append(context.call_activity("durable-activity", "Seattle"))
    tasks.append(context.call_activity("durable-activity", "London"))

    outputs = yield context.task_all(tasks)

    return outputs


main = df.Orchestrator.create(orchestrator_function)

書き換えたものを実行してみました。
「Tokyo」、「Seattle」、「London」の順で登録したタスクが「Seattle」、「London」、「Tokyo」と順不同で処理されました。
ただ、処理はそれぞれ2秒ごとに発生しておりFan-Out/Fan-Inで期待される並列処理にはなっていないようでした。まだプレビュー段階なので今後に期待といったところでしょうか。
image.png

まとめ

Python版とはいえDurable Functionsなので、スクリプトのお作法はJavaScriptと変わっておらず、JavaScriptでの開発経験があれば難なく開発はできそうです。
リリースにも書かれているように、Pythonの場合は機械学習やデータ分析方面での活用が期待でき、サーバレスでデータの並列処理環境を構築するといったシナリオが新たに可能になったというのは活用方法として面白そうです。

参考

7
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
7