概要
Azure Event Grid 経由で Azure Functions により Azure Blob Storage 間でコピーするサンプルコードを提示します。下記の記事がベースとなっています。
- Azure Storage SDK で Python により Azure Blob Storage 上のファイルをコピーする 4 つの方法 #AzureStorage - Qiita
- Azure Event Grid の 30 秒タイムアウト制限を Azure Durable Functions で回避する #AzureFunctions - Qiita
事前準備
利用したリソース
| リソース | 用途 |
|---|---|
| Azure Storage | イベント発生源となる Blob ストレージ |
| Azure Functions | Event Grid トリガー関数(エンドポイント) |
| Azure Event Grid | Blob 作成イベントを Functions にルーティング |
Azure Functions にコードを配置
コードを配置
import base64
import json
import logging
import os
from datetime import datetime, timezone
from urllib.parse import unquote, urlparse
import azure.functions as func
import azure.durable_functions as df
from azure.storage.blob import BlobBlock, BlobClient, ContainerClient
import hashlib
app = df.DFApp()
# upload_blob_from_url は 5,000 MiB 制限があるため、既定では安全側の 4,500 MiB で切り替える。
COPY_THRESHOLD_MIB = int(os.getenv("COPY_THRESHOLD_MIB", "4500"))
COPY_THRESHOLD_BYTES = COPY_THRESHOLD_MIB * 1024 * 1024
# stage_block_from_url で利用するブロックサイズ。
COPY_BLOCK_SIZE_MIB = int(os.getenv("COPY_BLOCK_SIZE_MIB", "100"))
COPY_BLOCK_SIZE_BYTES = COPY_BLOCK_SIZE_MIB * 1024 * 1024
# Event Grid の Blob URL は通常 SAS を含まないため、private blob では SOURCE_BLOB_SAS_TOKEN を設定する。
# 例: SOURCE_BLOB_SAS_TOKEN="sv=...&se=...&sp=r&sig=..."
SOURCE_BLOB_SAS_TOKEN = os.getenv("SOURCE_BLOB_SAS_TOKEN")
# コピー先 Blob は TARGET_STORAGE_CONNECTION_STRING と TARGET_CONTAINER_NAME で指定する。
# TARGET_STORAGE_CONNECTION_STRING 未設定時のみ AzureWebJobsStorage にフォールバックする。
TARGET_STORAGE_CONNECTION_STRING = (
os.getenv("TARGET_STORAGE_CONNECTION_STRING") or os.getenv("AzureWebJobsStorage")
)
TARGET_CONTAINER_NAME = os.getenv("TARGET_CONTAINER_NAME")
TARGET_BLOB_PREFIX = os.getenv("TARGET_BLOB_PREFIX", "").strip("/")
ACTIVE_RUNTIME_STATUSES = {"running", "pending", "continuedasnew"}
def _runtime_status_text(runtime_status):
if runtime_status is None:
return None
runtime_status_value = (
getattr(runtime_status, "value", None)
or getattr(runtime_status, "name", None)
or runtime_status
)
return str(runtime_status_value)
def _is_active_runtime_status(runtime_status):
if runtime_status is None:
return False
normalized = runtime_status.rsplit(".", 1)[-1].lower()
return normalized in ACTIVE_RUNTIME_STATUSES
def _existing_status_for_log(existing_status):
if isinstance(existing_status, dict):
return existing_status
for method_name in ("to_json", "to_dict"):
method = getattr(existing_status, method_name, None)
if callable(method):
try:
return method()
except Exception as exc:
return {
"repr": repr(existing_status),
"serialization_error": f"{method_name}: {exc}",
}
status_dict = getattr(existing_status, "__dict__", None)
if status_dict is not None:
return status_dict
return existing_status
def _append_sas_token(blob_url: str, sas_token: str | None) -> str:
if not sas_token:
return blob_url
normalized_sas_token = sas_token.lstrip("?")
separator = "&" if urlparse(blob_url).query else "?"
return f"{blob_url}{separator}{normalized_sas_token}"
def _extract_blob_name_from_url(blob_url: str) -> str:
parsed = urlparse(blob_url)
path_parts = parsed.path.lstrip("/").split("/", 1)
if len(path_parts) != 2 or not path_parts[1]:
raise ValueError(f"Blob 名を URL から取得できませんでした。blob_url={blob_url}")
return unquote(path_parts[1])
def _build_target_blob_name(source_blob_url: str) -> str:
source_blob_name = _extract_blob_name_from_url(source_blob_url)
if TARGET_BLOB_PREFIX:
return f"{TARGET_BLOB_PREFIX}/{source_blob_name}"
return source_blob_name
def _get_target_blob_client(source_blob_url: str) -> tuple[BlobClient, str]:
if not TARGET_STORAGE_CONNECTION_STRING:
raise RuntimeError(
"TARGET_STORAGE_CONNECTION_STRING または AzureWebJobsStorage を設定してください。"
)
if not TARGET_CONTAINER_NAME:
raise RuntimeError("TARGET_CONTAINER_NAME を設定してください。")
target_blob_name = _build_target_blob_name(source_blob_url)
target_container_client = ContainerClient.from_connection_string(
conn_str=TARGET_STORAGE_CONNECTION_STRING,
container_name=TARGET_CONTAINER_NAME,
)
return target_container_client.get_blob_client(target_blob_name), target_blob_name
def _content_length_from_event_data(data: dict | None) -> int | None:
if not isinstance(data, dict):
return None
for key in (
"contentLength",
"content_length",
"ContentLength",
"size",
"length",
):
value = data.get(key)
if value is None:
continue
try:
return int(value)
except (TypeError, ValueError):
logging.warning(
"BLOB_COPY_INVALID_CONTENT_LENGTH %s",
json.dumps(
{
"key": key,
"value": value,
"utc_now": datetime.now(timezone.utc).isoformat(),
},
ensure_ascii=False,
default=str,
),
)
return None
def _get_source_blob_size(source_blob_sas_url: str, data: dict | None) -> tuple[int, str]:
event_content_length = _content_length_from_event_data(data)
if event_content_length is not None:
return event_content_length, "event_grid_data"
source_blob_client = BlobClient.from_blob_url(source_blob_sas_url)
source_props = source_blob_client.get_blob_properties()
return int(source_props.size), "source_blob_properties"
def _make_block_id(block_index: int) -> str:
# Azure Storage の block_id は同一 Blob 内で同じ長さに揃える必要がある。
raw_block_id = f"{block_index:12d}".encode("ascii")
return base64.b64encode(raw_block_id).decode("ascii")
def _copy_with_upload_blob_from_url(
source_blob_sas_url: str,
target_blob_client: BlobClient,
) -> dict:
target_blob_client.upload_blob_from_url(
source_url=source_blob_sas_url,
overwrite=True,
)
return {
"method": "upload_blob_from_url",
"block_count": 0,
}
def _copy_with_stage_block_from_url(
source_blob_sas_url: str,
source_size: int,
target_blob_client: BlobClient,
) -> dict:
block_list: list[BlobBlock] = []
for block_index, source_offset in enumerate(
range(0, source_size, COPY_BLOCK_SIZE_BYTES)
):
source_length = min(COPY_BLOCK_SIZE_BYTES, source_size - source_offset)
block_id = _make_block_id(block_index)
target_blob_client.stage_block_from_url(
block_id=block_id,
source_url=source_blob_sas_url,
source_offset=source_offset,
source_length=source_length,
)
block_list.append(BlobBlock(block_id=block_id))
logging.info(
"BLOB_COPY_BLOCK_STAGED %s",
json.dumps(
{
"utc_now": datetime.now(timezone.utc).isoformat(),
"block_index": block_index,
"block_id": block_id,
"source_offset": source_offset,
"source_length": source_length,
},
ensure_ascii=False,
default=str,
),
)
target_blob_client.commit_block_list(block_list)
return {
"method": "stage_block_from_url_commit_block_list",
"block_count": len(block_list),
}
@app.function_name(name="BlobCreatedEventGridProbe")
@app.event_grid_trigger(arg_name="event")
@app.durable_client_input(client_name="client")
async def blob_created_event_grid_probe(
event: func.EventGridEvent,
client: df.DurableOrchestrationClient,
) -> None:
"""
Event Grid から直接呼ばれる関数。
ここでは長時間処理をしない。
Durable Orchestration を開始して、すぐに正常終了する。
"""
data = event.get_json()
blob_url = data.get("url") if isinstance(data, dict) else None
event_time = event.event_time
if event_time is not None and event_time.tzinfo is None:
event_time = event_time.replace(tzinfo=timezone.utc)
elif event_time is not None:
event_time = event_time.astimezone(timezone.utc)
payload = {
"probe": "event-grid-blob-created-copy",
"utc_now": datetime.now(timezone.utc).isoformat(),
"event_id": event.id,
"event_type": event.event_type,
"event_time": event_time.isoformat() if event_time else None,
"subject": event.subject,
"blob_url": blob_url,
"data": data,
"copy_threshold_mib": COPY_THRESHOLD_MIB,
"copy_block_size_mib": COPY_BLOCK_SIZE_MIB,
"target_container_name": TARGET_CONTAINER_NAME,
"target_blob_prefix": TARGET_BLOB_PREFIX,
}
# Event Grid の同一イベントが再配信されたか確認しやすくするため、
# event.id から固定 instance_id を作る。
instance_id = _make_copy_instance_id(blob_url, data)
logging.info(
"EVENTGRID_COPY_RECEIVED %s",
json.dumps(
{
**payload,
"instance_id": instance_id,
},
ensure_ascii=False,
default=str,
),
)
# 重複配送、または Event Grid retry が発生した場合に、
# 同じ event_id / instance_id で検知しやすくする。
existing_status = await client.get_status(instance_id)
if existing_status is not None:
runtime_status = getattr(existing_status, "runtime_status", None)
if runtime_status is None:
runtime_status = getattr(existing_status, "runtimeStatus", None)
if runtime_status is None and isinstance(existing_status, dict):
runtime_status = (
existing_status.get("runtimeStatus")
or existing_status.get("runtime_status")
)
runtime_status_text = _runtime_status_text(runtime_status)
logging.info(
"EVENTGRID_COPY_EXISTING_STATUS_RAW %s",
json.dumps(
{
"event_id": event.id,
"instance_id": instance_id,
"existing_status_type": str(type(existing_status)),
"existing_status": _existing_status_for_log(existing_status),
"existing_status_repr": repr(existing_status),
"runtime_status": runtime_status_text,
"utc_now": datetime.now(timezone.utc).isoformat(),
},
ensure_ascii=False,
default=str,
),
)
if _is_active_runtime_status(runtime_status_text):
logging.info(
"EVENTGRID_COPY_DUPLICATE_OR_ALREADY_STARTED %s",
json.dumps(
{
"event_id": event.id,
"instance_id": instance_id,
"runtime_status": runtime_status_text,
"utc_now": datetime.now(timezone.utc).isoformat(),
},
ensure_ascii=False,
default=str,
),
)
return
logging.info(
"EVENTGRID_COPY_EXISTING_STATUS_NOT_DUPLICATE %s",
json.dumps(
{
"event_id": event.id,
"instance_id": instance_id,
"runtime_status": runtime_status_text,
"reason": "runtime_status_is_not_active",
"utc_now": datetime.now(timezone.utc).isoformat(),
},
ensure_ascii=False,
default=str,
),
)
started_instance_id = await client.start_new(
"EventGridProbeOrchestrator",
instance_id,
payload,
)
logging.info(
"EVENTGRID_COPY_ORCHESTRATION_STARTED %s",
json.dumps(
{
"event_id": event.id,
"instance_id": started_instance_id,
"copy_threshold_mib": COPY_THRESHOLD_MIB,
"copy_block_size_mib": COPY_BLOCK_SIZE_MIB,
"utc_now": datetime.now(timezone.utc).isoformat(),
},
ensure_ascii=False,
default=str,
),
)
# ここで return するため、Event Grid にはすぐ成功応答される。
return
@app.function_name(name="EventGridProbeOrchestrator")
@app.orchestration_trigger(context_name="context")
def event_grid_probe_orchestrator(context: df.DurableOrchestrationContext):
"""
Durable Orchestrator。
Activity 内で Blob サイズに応じたサーバー側コピーを行う。
"""
payload = context.get_input()
result = yield context.call_activity("CopyBlobFromUrlBySize", payload)
return result
@app.function_name(name="CopyBlobFromUrlBySize")
@app.activity_trigger(input_name="payload")
def copy_blob_from_url_by_size(payload: dict) -> dict:
"""
Blob サイズが 4,500 MiB 以下であれば upload_blob_from_url を利用する。
4,500 MiB を超える場合は stage_block_from_url + commit_block_list を利用する。
"""
source_blob_url = payload.get("blob_url")
if not source_blob_url:
raise ValueError("payload.blob_url が空です。")
source_blob_sas_url = _append_sas_token(source_blob_url, SOURCE_BLOB_SAS_TOKEN)
source_size, source_size_source = _get_source_blob_size(
source_blob_sas_url=source_blob_sas_url,
data=payload.get("data"),
)
target_blob_client, target_blob_name = _get_target_blob_client(source_blob_url)
logging.info(
"BLOB_COPY_STARTED %s",
json.dumps(
{
"utc_now": datetime.now(timezone.utc).isoformat(),
"event_id": payload.get("event_id"),
"source_blob_url": source_blob_url,
"source_size": source_size,
"source_size_source": source_size_source,
"copy_threshold_bytes": COPY_THRESHOLD_BYTES,
"copy_threshold_mib": COPY_THRESHOLD_MIB,
"copy_block_size_bytes": COPY_BLOCK_SIZE_BYTES,
"copy_block_size_mib": COPY_BLOCK_SIZE_MIB,
"target_container_name": TARGET_CONTAINER_NAME,
"target_blob_name": target_blob_name,
},
ensure_ascii=False,
default=str,
),
)
if source_size <= COPY_THRESHOLD_BYTES:
copy_result = _copy_with_upload_blob_from_url(
source_blob_sas_url=source_blob_sas_url,
target_blob_client=target_blob_client,
)
else:
copy_result = _copy_with_stage_block_from_url(
source_blob_sas_url=source_blob_sas_url,
source_size=source_size,
target_blob_client=target_blob_client,
)
target_props = target_blob_client.get_blob_properties()
record = {
"probe": "event-grid-blob-created-copy",
"phase": "activity_copy_completed",
"utc_now": datetime.now(timezone.utc).isoformat(),
"event_id": payload.get("event_id"),
"event_type": payload.get("event_type"),
"event_time": payload.get("event_time"),
"subject": payload.get("subject"),
"source_blob_url": source_blob_url,
"source_size": source_size,
"source_size_mib": round(source_size / 1024 / 1024, 2),
"source_size_source": source_size_source,
"target_container_name": TARGET_CONTAINER_NAME,
"target_blob_name": target_blob_name,
"target_size": int(target_props.size),
"copy_threshold_mib": COPY_THRESHOLD_MIB,
"copy_block_size_mib": COPY_BLOCK_SIZE_MIB,
**copy_result,
}
logging.info(
"BLOB_COPY_COMPLETED %s",
json.dumps(record, ensure_ascii=False, default=str),
)
return record
def _make_copy_instance_id(blob_url: str, data: dict | None) -> str:
source_etag = None
if isinstance(data, dict):
source_etag = data.get("eTag") or data.get("etag")
idempotency_key = {
"blob_url": blob_url,
"source_etag": source_etag,
}
raw = json.dumps(idempotency_key, sort_keys=True, ensure_ascii=False)
digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()
return f"blob-copy-{digest}"

















