0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Event Grid 経由で Azure Functions により Azure Blob Storage 間でコピーするサンプルコード

0
Last updated at Posted at 2026-05-17

概要

Azure Event Grid 経由で Azure Functions により Azure Blob Storage 間でコピーするサンプルコードを提示します。下記の記事がベースとなっています。

事前準備

利用したリソース

リソース 用途
Azure Storage イベント発生源となる Blob ストレージ
Azure Functions Event Grid トリガー関数(エンドポイント)
Azure Event Grid Blob 作成イベントを Functions にルーティング

Azure Functions にコードを配置

コードを配置
import base64
import json
import logging
import os
from datetime import datetime, timezone
from urllib.parse import unquote, urlparse

import azure.functions as func
import azure.durable_functions as df
from azure.storage.blob import BlobBlock, BlobClient, ContainerClient
import hashlib


app = df.DFApp()

# upload_blob_from_url は 5,000 MiB 制限があるため、既定では安全側の 4,500 MiB で切り替える。
COPY_THRESHOLD_MIB = int(os.getenv("COPY_THRESHOLD_MIB", "4500"))
COPY_THRESHOLD_BYTES = COPY_THRESHOLD_MIB * 1024 * 1024

# stage_block_from_url で利用するブロックサイズ。
COPY_BLOCK_SIZE_MIB = int(os.getenv("COPY_BLOCK_SIZE_MIB", "100"))
COPY_BLOCK_SIZE_BYTES = COPY_BLOCK_SIZE_MIB * 1024 * 1024

# Event Grid の Blob URL は通常 SAS を含まないため、private blob では SOURCE_BLOB_SAS_TOKEN を設定する。
# 例: SOURCE_BLOB_SAS_TOKEN="sv=...&se=...&sp=r&sig=..."
SOURCE_BLOB_SAS_TOKEN = os.getenv("SOURCE_BLOB_SAS_TOKEN")

# コピー先 Blob は TARGET_STORAGE_CONNECTION_STRING と TARGET_CONTAINER_NAME で指定する。
# TARGET_STORAGE_CONNECTION_STRING 未設定時のみ AzureWebJobsStorage にフォールバックする。
TARGET_STORAGE_CONNECTION_STRING = (
    os.getenv("TARGET_STORAGE_CONNECTION_STRING") or os.getenv("AzureWebJobsStorage")
)
TARGET_CONTAINER_NAME = os.getenv("TARGET_CONTAINER_NAME")
TARGET_BLOB_PREFIX = os.getenv("TARGET_BLOB_PREFIX", "").strip("/")

ACTIVE_RUNTIME_STATUSES = {"running", "pending", "continuedasnew"}


def _runtime_status_text(runtime_status):
    if runtime_status is None:
        return None

    runtime_status_value = (
        getattr(runtime_status, "value", None)
        or getattr(runtime_status, "name", None)
        or runtime_status
    )
    return str(runtime_status_value)


def _is_active_runtime_status(runtime_status):
    if runtime_status is None:
        return False

    normalized = runtime_status.rsplit(".", 1)[-1].lower()
    return normalized in ACTIVE_RUNTIME_STATUSES


def _existing_status_for_log(existing_status):
    if isinstance(existing_status, dict):
        return existing_status

    for method_name in ("to_json", "to_dict"):
        method = getattr(existing_status, method_name, None)
        if callable(method):
            try:
                return method()
            except Exception as exc:
                return {
                    "repr": repr(existing_status),
                    "serialization_error": f"{method_name}: {exc}",
                }

    status_dict = getattr(existing_status, "__dict__", None)
    if status_dict is not None:
        return status_dict

    return existing_status


def _append_sas_token(blob_url: str, sas_token: str | None) -> str:
    if not sas_token:
        return blob_url

    normalized_sas_token = sas_token.lstrip("?")
    separator = "&" if urlparse(blob_url).query else "?"
    return f"{blob_url}{separator}{normalized_sas_token}"


def _extract_blob_name_from_url(blob_url: str) -> str:
    parsed = urlparse(blob_url)
    path_parts = parsed.path.lstrip("/").split("/", 1)

    if len(path_parts) != 2 or not path_parts[1]:
        raise ValueError(f"Blob 名を URL から取得できませんでした。blob_url={blob_url}")

    return unquote(path_parts[1])


def _build_target_blob_name(source_blob_url: str) -> str:
    source_blob_name = _extract_blob_name_from_url(source_blob_url)

    if TARGET_BLOB_PREFIX:
        return f"{TARGET_BLOB_PREFIX}/{source_blob_name}"

    return source_blob_name


def _get_target_blob_client(source_blob_url: str) -> tuple[BlobClient, str]:
    if not TARGET_STORAGE_CONNECTION_STRING:
        raise RuntimeError(
            "TARGET_STORAGE_CONNECTION_STRING または AzureWebJobsStorage を設定してください。"
        )

    if not TARGET_CONTAINER_NAME:
        raise RuntimeError("TARGET_CONTAINER_NAME を設定してください。")

    target_blob_name = _build_target_blob_name(source_blob_url)
    target_container_client = ContainerClient.from_connection_string(
        conn_str=TARGET_STORAGE_CONNECTION_STRING,
        container_name=TARGET_CONTAINER_NAME,
    )

    return target_container_client.get_blob_client(target_blob_name), target_blob_name


def _content_length_from_event_data(data: dict | None) -> int | None:
    if not isinstance(data, dict):
        return None

    for key in (
        "contentLength",
        "content_length",
        "ContentLength",
        "size",
        "length",
    ):
        value = data.get(key)
        if value is None:
            continue

        try:
            return int(value)
        except (TypeError, ValueError):
            logging.warning(
                "BLOB_COPY_INVALID_CONTENT_LENGTH %s",
                json.dumps(
                    {
                        "key": key,
                        "value": value,
                        "utc_now": datetime.now(timezone.utc).isoformat(),
                    },
                    ensure_ascii=False,
                    default=str,
                ),
            )

    return None


def _get_source_blob_size(source_blob_sas_url: str, data: dict | None) -> tuple[int, str]:
    event_content_length = _content_length_from_event_data(data)
    if event_content_length is not None:
        return event_content_length, "event_grid_data"

    source_blob_client = BlobClient.from_blob_url(source_blob_sas_url)
    source_props = source_blob_client.get_blob_properties()
    return int(source_props.size), "source_blob_properties"


def _make_block_id(block_index: int) -> str:
    # Azure Storage の block_id は同一 Blob 内で同じ長さに揃える必要がある。
    raw_block_id = f"{block_index:12d}".encode("ascii")
    return base64.b64encode(raw_block_id).decode("ascii")


def _copy_with_upload_blob_from_url(
    source_blob_sas_url: str,
    target_blob_client: BlobClient,
) -> dict:
    target_blob_client.upload_blob_from_url(
        source_url=source_blob_sas_url,
        overwrite=True,
    )

    return {
        "method": "upload_blob_from_url",
        "block_count": 0,
    }


def _copy_with_stage_block_from_url(
    source_blob_sas_url: str,
    source_size: int,
    target_blob_client: BlobClient,
) -> dict:
    block_list: list[BlobBlock] = []

    for block_index, source_offset in enumerate(
        range(0, source_size, COPY_BLOCK_SIZE_BYTES)
    ):
        source_length = min(COPY_BLOCK_SIZE_BYTES, source_size - source_offset)
        block_id = _make_block_id(block_index)

        target_blob_client.stage_block_from_url(
            block_id=block_id,
            source_url=source_blob_sas_url,
            source_offset=source_offset,
            source_length=source_length,
        )

        block_list.append(BlobBlock(block_id=block_id))

        logging.info(
            "BLOB_COPY_BLOCK_STAGED %s",
            json.dumps(
                {
                    "utc_now": datetime.now(timezone.utc).isoformat(),
                    "block_index": block_index,
                    "block_id": block_id,
                    "source_offset": source_offset,
                    "source_length": source_length,
                },
                ensure_ascii=False,
                default=str,
            ),
        )

    target_blob_client.commit_block_list(block_list)

    return {
        "method": "stage_block_from_url_commit_block_list",
        "block_count": len(block_list),
    }


@app.function_name(name="BlobCreatedEventGridProbe")
@app.event_grid_trigger(arg_name="event")
@app.durable_client_input(client_name="client")
async def blob_created_event_grid_probe(
    event: func.EventGridEvent,
    client: df.DurableOrchestrationClient,
) -> None:
    """
    Event Grid から直接呼ばれる関数。

    ここでは長時間処理をしない。
    Durable Orchestration を開始して、すぐに正常終了する。
    """

    data = event.get_json()
    blob_url = data.get("url") if isinstance(data, dict) else None

    event_time = event.event_time
    if event_time is not None and event_time.tzinfo is None:
        event_time = event_time.replace(tzinfo=timezone.utc)
    elif event_time is not None:
        event_time = event_time.astimezone(timezone.utc)

    payload = {
        "probe": "event-grid-blob-created-copy",
        "utc_now": datetime.now(timezone.utc).isoformat(),
        "event_id": event.id,
        "event_type": event.event_type,
        "event_time": event_time.isoformat() if event_time else None,
        "subject": event.subject,
        "blob_url": blob_url,
        "data": data,
        "copy_threshold_mib": COPY_THRESHOLD_MIB,
        "copy_block_size_mib": COPY_BLOCK_SIZE_MIB,
        "target_container_name": TARGET_CONTAINER_NAME,
        "target_blob_prefix": TARGET_BLOB_PREFIX,
    }

    # Event Grid の同一イベントが再配信されたか確認しやすくするため、
    # event.id から固定 instance_id を作る。
    instance_id = _make_copy_instance_id(blob_url, data)

    logging.info(
        "EVENTGRID_COPY_RECEIVED %s",
        json.dumps(
            {
                **payload,
                "instance_id": instance_id,
            },
            ensure_ascii=False,
            default=str,
        ),
    )

    # 重複配送、または Event Grid retry が発生した場合に、
    # 同じ event_id / instance_id で検知しやすくする。
    existing_status = await client.get_status(instance_id)

    if existing_status is not None:
        runtime_status = getattr(existing_status, "runtime_status", None)

        if runtime_status is None:
            runtime_status = getattr(existing_status, "runtimeStatus", None)

        if runtime_status is None and isinstance(existing_status, dict):
            runtime_status = (
                existing_status.get("runtimeStatus")
                or existing_status.get("runtime_status")
            )

        runtime_status_text = _runtime_status_text(runtime_status)

        logging.info(
            "EVENTGRID_COPY_EXISTING_STATUS_RAW %s",
            json.dumps(
                {
                    "event_id": event.id,
                    "instance_id": instance_id,
                    "existing_status_type": str(type(existing_status)),
                    "existing_status": _existing_status_for_log(existing_status),
                    "existing_status_repr": repr(existing_status),
                    "runtime_status": runtime_status_text,
                    "utc_now": datetime.now(timezone.utc).isoformat(),
                },
                ensure_ascii=False,
                default=str,
            ),
        )

        if _is_active_runtime_status(runtime_status_text):
            logging.info(
                "EVENTGRID_COPY_DUPLICATE_OR_ALREADY_STARTED %s",
                json.dumps(
                    {
                        "event_id": event.id,
                        "instance_id": instance_id,
                        "runtime_status": runtime_status_text,
                        "utc_now": datetime.now(timezone.utc).isoformat(),
                    },
                    ensure_ascii=False,
                    default=str,
                ),
            )
            return

        logging.info(
            "EVENTGRID_COPY_EXISTING_STATUS_NOT_DUPLICATE %s",
            json.dumps(
                {
                    "event_id": event.id,
                    "instance_id": instance_id,
                    "runtime_status": runtime_status_text,
                    "reason": "runtime_status_is_not_active",
                    "utc_now": datetime.now(timezone.utc).isoformat(),
                },
                ensure_ascii=False,
                default=str,
            ),
        )

    started_instance_id = await client.start_new(
        "EventGridProbeOrchestrator",
        instance_id,
        payload,
    )

    logging.info(
        "EVENTGRID_COPY_ORCHESTRATION_STARTED %s",
        json.dumps(
            {
                "event_id": event.id,
                "instance_id": started_instance_id,
                "copy_threshold_mib": COPY_THRESHOLD_MIB,
                "copy_block_size_mib": COPY_BLOCK_SIZE_MIB,
                "utc_now": datetime.now(timezone.utc).isoformat(),
            },
            ensure_ascii=False,
            default=str,
        ),
    )

    # ここで return するため、Event Grid にはすぐ成功応答される。
    return


@app.function_name(name="EventGridProbeOrchestrator")
@app.orchestration_trigger(context_name="context")
def event_grid_probe_orchestrator(context: df.DurableOrchestrationContext):
    """
    Durable Orchestrator。
    Activity 内で Blob サイズに応じたサーバー側コピーを行う。
    """

    payload = context.get_input()
    result = yield context.call_activity("CopyBlobFromUrlBySize", payload)
    return result


@app.function_name(name="CopyBlobFromUrlBySize")
@app.activity_trigger(input_name="payload")
def copy_blob_from_url_by_size(payload: dict) -> dict:
    """
    Blob サイズが 4,500 MiB 以下であれば upload_blob_from_url を利用する。
    4,500 MiB を超える場合は stage_block_from_url + commit_block_list を利用する。
    """

    source_blob_url = payload.get("blob_url")
    if not source_blob_url:
        raise ValueError("payload.blob_url が空です。")

    source_blob_sas_url = _append_sas_token(source_blob_url, SOURCE_BLOB_SAS_TOKEN)
    source_size, source_size_source = _get_source_blob_size(
        source_blob_sas_url=source_blob_sas_url,
        data=payload.get("data"),
    )
    target_blob_client, target_blob_name = _get_target_blob_client(source_blob_url)

    logging.info(
        "BLOB_COPY_STARTED %s",
        json.dumps(
            {
                "utc_now": datetime.now(timezone.utc).isoformat(),
                "event_id": payload.get("event_id"),
                "source_blob_url": source_blob_url,
                "source_size": source_size,
                "source_size_source": source_size_source,
                "copy_threshold_bytes": COPY_THRESHOLD_BYTES,
                "copy_threshold_mib": COPY_THRESHOLD_MIB,
                "copy_block_size_bytes": COPY_BLOCK_SIZE_BYTES,
                "copy_block_size_mib": COPY_BLOCK_SIZE_MIB,
                "target_container_name": TARGET_CONTAINER_NAME,
                "target_blob_name": target_blob_name,
            },
            ensure_ascii=False,
            default=str,
        ),
    )

    if source_size <= COPY_THRESHOLD_BYTES:
        copy_result = _copy_with_upload_blob_from_url(
            source_blob_sas_url=source_blob_sas_url,
            target_blob_client=target_blob_client,
        )
    else:
        copy_result = _copy_with_stage_block_from_url(
            source_blob_sas_url=source_blob_sas_url,
            source_size=source_size,
            target_blob_client=target_blob_client,
        )

    target_props = target_blob_client.get_blob_properties()

    record = {
        "probe": "event-grid-blob-created-copy",
        "phase": "activity_copy_completed",
        "utc_now": datetime.now(timezone.utc).isoformat(),
        "event_id": payload.get("event_id"),
        "event_type": payload.get("event_type"),
        "event_time": payload.get("event_time"),
        "subject": payload.get("subject"),
        "source_blob_url": source_blob_url,
        "source_size": source_size,
        "source_size_mib": round(source_size / 1024 / 1024, 2),
        "source_size_source": source_size_source,
        "target_container_name": TARGET_CONTAINER_NAME,
        "target_blob_name": target_blob_name,
        "target_size": int(target_props.size),
        "copy_threshold_mib": COPY_THRESHOLD_MIB,
        "copy_block_size_mib": COPY_BLOCK_SIZE_MIB,
        **copy_result,
    }

    logging.info(
        "BLOB_COPY_COMPLETED %s",
        json.dumps(record, ensure_ascii=False, default=str),
    )

    return record


def _make_copy_instance_id(blob_url: str, data: dict | None) -> str:
    source_etag = None

    if isinstance(data, dict):
        source_etag = data.get("eTag") or data.get("etag")

    idempotency_key = {
        "blob_url": blob_url,
        "source_etag": source_etag,
    }

    raw = json.dumps(idempotency_key, sort_keys=True, ensure_ascii=False)
    digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()

    return f"blob-copy-{digest}"

image.png

環境変数をセット

image.png

image.png

image.png

image.png

image.png

image.png

image.png

補足

1GiB のファイルを移動

image.png

image.png

image.png

image.png

image.png

10 GiB のファイルを移動

image.png

image.png

image.png

image.png

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?