0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Event Grid の 30 秒タイムアウト制限を Azure Durable Functions で回避する

0
Last updated at Posted at 2026-05-17

概要

Azure Event Grid には、サブスクライバー(Azure Functions など)の処理が 30 秒以内に完了しない場合にリトライが発生する という仕様があります。本記事では、Azure Durable Functions を活用してこの制約を回避できるかを検証した結果を共有します。

具体的には、Event Grid トリガー関数の役割を 「オーケストレーションを起動するだけ」 に限定し、長時間処理は Durable Functions の Activity 関数内で実行するパターンを検証します。これにより、Event Grid から見たエンドポイントは常に短時間で応答を返すため、30 秒タイムアウトの影響を受けずに任意の長さの処理を完遂できます。

image.png

Azure Event Grid のリトライ仕様そのものについては、下記の記事で詳しく検証しています。

アーキテクチャ概要

直接実行パターンと Durable Functions パターンの比較

観点 直接実行パターン Durable Functions パターン(本記事)
Event Grid から呼ばれる関数の責務 長時間処理を直接実行 オーケストレーションを起動して即座に返却
長時間処理の実行場所 Event Grid トリガー関数 Activity 関数
30 秒タイムアウトの影響 受ける(リトライが発生) 受けない
重複配送・リトライ対策 個別実装が必要 instance_id ベースで抑制可能
実装の複雑度 中(Orchestrator / Activity の分離が必要)

処理フロー

  1. Blob 作成イベントが発生
  2. Event Grid が BlobCreatedEventGridProbe 関数(Event Grid トリガー)を呼び出す
  3. 同関数は Durable Orchestration を起動して 即座に終了(本検証では約 0.5 秒)
  4. Event Grid には正常応答が返る → タイムアウト・リトライは発生しない
  5. EventGridProbeOrchestrator(オーケストレーター)が SleepAndLogEventGridProbe(Activity)を呼び出す
  6. Activity 内で長時間処理(本検証では 240 秒の time.sleep)を実行
  7. Activity 完了後、オーケストレーターが再開してフローを完了させる

事前準備

利用したリソース

リソース 用途
Azure Storage イベント発生源となる Blob ストレージ
Azure Functions Event Grid トリガー関数(エンドポイント)
Azure Event Grid Blob 作成イベントを Functions にルーティング

関数のデプロイ

検証用のコードは前回の記事を踏襲しつつ、Durable Functions パターンに改修しています。実装上のポイントは以下の通りです。

  • Event Grid トリガー関数 では client.start_new() でオーケストレーションを起動するだけで、長時間処理は行わない
  • instance_idevent.id ベース(f"eg-{event.id}")で生成し、Event Grid 側でリトライが発生した場合でも重複起動を抑制
  • 起動前に client.get_status(instance_id) で既存インスタンスの状態を確認し、running / pending / continuedAsNew のいずれかであればスキップ
  • 長時間処理(time.sleep)は Activity 関数 内で実行
コードはこちら
import json
import logging
import os
import time
from datetime import datetime, timezone, timedelta

import azure.functions as func
import azure.durable_functions as df


app = df.DFApp()

# 既存の WAITE_SECONDS も互換で読む。今後は WAIT_SECONDS 推奨。
WAIT_SECONDS = int(os.getenv("WAIT_SECONDS", os.getenv("WAITE_SECONDS", "50")))
ACTIVE_RUNTIME_STATUSES = {"running", "pending", "continuedasnew"}


def _runtime_status_text(runtime_status):
    if runtime_status is None:
        return None

    runtime_status_value = (
        getattr(runtime_status, "value", None)
        or getattr(runtime_status, "name", None)
        or runtime_status
    )
    return str(runtime_status_value)


def _is_active_runtime_status(runtime_status):
    if runtime_status is None:
        return False

    normalized = runtime_status.rsplit(".", 1)[-1].lower()
    return normalized in ACTIVE_RUNTIME_STATUSES


def _existing_status_for_log(existing_status):
    if isinstance(existing_status, dict):
        return existing_status

    for method_name in ("to_json", "to_dict"):
        method = getattr(existing_status, method_name, None)
        if callable(method):
            try:
                return method()
            except Exception as exc:
                return {
                    "repr": repr(existing_status),
                    "serialization_error": f"{method_name}: {exc}",
                }

    status_dict = getattr(existing_status, "__dict__", None)
    if status_dict is not None:
        return status_dict

    return existing_status


@app.function_name(name="BlobCreatedEventGridProbe")
@app.event_grid_trigger(arg_name="event")
@app.durable_client_input(client_name="client")
async def blob_created_event_grid_probe(
    event: func.EventGridEvent,
    client: df.DurableOrchestrationClient,
) -> None:
    """
    Event Grid から直接呼ばれる関数。

    ここでは長時間処理をしない。
    Durable Orchestration を開始して、すぐに正常終了する。
    """

    data = event.get_json()
    blob_url = data.get("url") if isinstance(data, dict) else None

    event_time = event.event_time
    if event_time is not None and event_time.tzinfo is None:
        event_time = event_time.replace(tzinfo=timezone.utc)
    elif event_time is not None:
        event_time = event_time.astimezone(timezone.utc)

    payload = {
        "probe": "event-grid-blob-created-latency",
        "utc_now": datetime.now(timezone.utc).isoformat(),
        "event_id": event.id,
        "event_type": event.event_type,
        "event_time": event_time.isoformat() if event_time else None,
        "subject": event.subject,
        "blob_url": blob_url,
        "data": data,
        "wait_seconds": WAIT_SECONDS,
    }

    # Event Grid の同一イベントが再配信されたか確認しやすくするため、
    # event.id から固定 instance_id を作る。
    instance_id = f"eg-{event.id}"

    logging.info(
        "EVENTGRID_PROBE_RECEIVED %s",
        json.dumps(
            {
                **payload,
                "instance_id": instance_id,
            },
            ensure_ascii=False,
            default=str,
        ),
    )

    # 重複配送、または Event Grid retry が発生した場合に、
    # 同じ event_id / instance_id で検知しやすくする。
    existing_status = await client.get_status(instance_id)

    if existing_status is not None:
        runtime_status = getattr(existing_status, "runtime_status", None)

        if runtime_status is None:
            runtime_status = getattr(existing_status, "runtimeStatus", None)

        if runtime_status is None and isinstance(existing_status, dict):
            runtime_status = (
                existing_status.get("runtimeStatus")
                or existing_status.get("runtime_status")
            )

        runtime_status_text = _runtime_status_text(runtime_status)

        logging.info(
            "EVENTGRID_PROBE_EXISTING_STATUS_RAW %s",
            json.dumps(
                {
                    "event_id": event.id,
                    "instance_id": instance_id,
                    "existing_status_type": str(type(existing_status)),
                    "existing_status": _existing_status_for_log(existing_status),
                    "existing_status_repr": repr(existing_status),
                    "runtime_status": runtime_status_text,
                    "utc_now": datetime.now(timezone.utc).isoformat(),
                },
                ensure_ascii=False,
                default=str,
            ),
        )

        if _is_active_runtime_status(runtime_status_text):
            logging.info(
                "EVENTGRID_PROBE_DUPLICATE_OR_ALREADY_STARTED %s",
                json.dumps(
                    {
                        "event_id": event.id,
                        "instance_id": instance_id,
                        "runtime_status": runtime_status_text,
                        "utc_now": datetime.now(timezone.utc).isoformat(),
                    },
                    ensure_ascii=False,
                    default=str,
                ),
            )
            return

        logging.info(
            "EVENTGRID_PROBE_EXISTING_STATUS_NOT_DUPLICATE %s",
            json.dumps(
                {
                    "event_id": event.id,
                    "instance_id": instance_id,
                    "runtime_status": runtime_status_text,
                    "reason": "runtime_status_is_not_active",
                    "utc_now": datetime.now(timezone.utc).isoformat(),
                },
                ensure_ascii=False,
                default=str,
            ),
        )

    started_instance_id = await client.start_new(
        "EventGridProbeOrchestrator",
        instance_id,
        payload,
    )

    logging.info(
        "EVENTGRID_PROBE_ORCHESTRATION_STARTED %s",
        json.dumps(
            {
                "event_id": event.id,
                "instance_id": started_instance_id,
                "wait_seconds": WAIT_SECONDS,
                "utc_now": datetime.now(timezone.utc).isoformat(),
            },
            ensure_ascii=False,
            default=str,
        ),
    )

    # ここで return するため、Event Grid にはすぐ成功応答される。
    return


@app.function_name(name="EventGridProbeOrchestrator")
@app.orchestration_trigger(context_name="context")
def event_grid_probe_orchestrator(context: df.DurableOrchestrationContext):
    """
    Durable Orchestrator。
    Activity 内で time.sleep する。
    """

    payload = context.get_input()
    result = yield context.call_activity("SleepAndLogEventGridProbe", payload)
    return result


@app.function_name(name="SleepAndLogEventGridProbe")
@app.activity_trigger(input_name="payload")
def sleep_and_log_event_grid_probe(payload: dict) -> dict:
    """
    Activity 内で time.sleep する。
    """

    wait_seconds = int(payload.get("wait_seconds", 50))

    logging.info(
        "EVENTGRID_PROBE_ACTIVITY_SLEEP_STARTED %s",
        json.dumps(
            {
                "utc_now": datetime.now(timezone.utc).isoformat(),
                "event_id": payload.get("event_id"),
                "wait_seconds": wait_seconds,
            },
            ensure_ascii=False,
            default=str,
        ),
    )

    time.sleep(wait_seconds)

    record = {
        "probe": "event-grid-blob-created-latency",
        "phase": "activity_sleep_completed",
        "utc_now": datetime.now(timezone.utc).isoformat(),
        "event_id": payload.get("event_id"),
        "event_type": payload.get("event_type"),
        "event_time": payload.get("event_time"),
        "subject": payload.get("subject"),
        "blob_url": payload.get("blob_url"),
        "wait_seconds": wait_seconds,
    }

    logging.info(
        "EVENTGRID_PROBE_ACTIVITY_SLEEP_COMPLETED %s",
        json.dumps(record, ensure_ascii=False, default=str),
    )

    return record

requirements.txtは下記です。

# Uncomment to enable Azure Monitor OpenTelemetry
# Ref: aka.ms/functions-azure-monitor-python
# azure-monitor-opentelemetry

azure-functions
azure-functions-durable

host.jsonは下記です。

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  },
  "extensions": {
    "durableTask": {
      "hubName": "EventGridProbeHub"
    }
  }
}

WAIT_SECONDS 環境変数に 240 を設定

Activity 関数内での待機時間を、Event Grid のタイムアウト(30 秒)を大きく超える 240 秒 に設定します。これにより「直接実行パターンであれば確実にタイムアウト → リトライが発生する」状況を再現しています。

image.png

ファイルを配置

Blob ストレージにファイルを配置し、Event Grid イベントをトリガーします。

image.png

検証結果

Azure Event Grid のログを確認

Delivered Events として記録されており、リトライが発生していない ことを確認できました。Event Grid トリガー関数が即座に正常応答を返したため、Event Grid 側からは「正常に配信完了したイベント」として扱われています。

image.png

blob_created_event_grid_probe 関数の確認

Event Grid トリガー関数自体は 約 0.5 秒で完了 しており、30 秒以下という Event Grid の要件を満たしていることを確認できました。この関数の内部から event_grid_probe_orchestrator(オーケストレーター)を起動しています。

image.png

event_grid_probe_orchestrator 関数の確認

オーケストレーター関数が sleep_and_log_event_grid_probe Activity を呼び出していることを確認できます。

ここで注意すべきは、Durable Functions のオーケストレーターは Activity の完了を同期的にブロックして待っているわけではない という点です。yield context.call_activity(...) の時点で一度サスペンドし、Activity 完了イベントを受けて再開する、というイベントソーシング型の挙動になります。そのため、最初の実行ログは Activity を起動した時点で出力されます。

image.png

Activity 完了後にオーケストレーターが再開され、もう一度ログが出力されることを確認できました。

image.png

sleep_and_log_event_grid_probe 関数の確認

環境変数で指定した 240 秒の待機が完遂 されていることを確認できます。Event Grid の 30 秒タイムアウトを大きく超える処理であっても、リトライが発生せず最後まで実行されました。

image.png

まとめ

本検証で確認できた事項は以下の通りです。

  • Event Grid トリガー関数の中で直接長時間処理を実行すると、30 秒タイムアウトでリトライが発生する仕様がある
  • Event Grid トリガー関数の責務を 「Durable Orchestration の起動のみ」 に限定し、実処理を Activity 関数に切り出すことで、この制約を回避できる
  • instance_idevent.id ベース(f"eg-{event.id}")に設定することで、Event Grid のリトライ時にも重複起動を抑制できる(本検証では Event Grid 側でリトライ自体が発生しなかったため、抑制ロジックが発火するケースは観測されませんでした)
  • 環境変数で指定した 240 秒の長時間処理を、Event Grid 側でリトライさせることなく完遂できることを確認した

長時間処理が必要なイベント駆動ワークロード(大容量ファイルの後段処理、外部 API への連続呼び出しなど)では、Event Grid + Durable Functions の組み合わせが有力な選択肢になります。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?