はじめに
Excelで管理しているデータをAzure基盤上に移行し、データ活用基盤を構築する取り組みを行いました。
具体的には、Boxに保存されているExcelファイルをAzureに連携、Azure Functionsでデータ加工を行い、SQL Databaseに保存するフローをAzure Logic Appsで構築。SQL Databaseに蓄積したデータを、最終的にPowerBIで可視化する仕組みを構築しました。
このシステムを構築するうえでポイントとなった技術について前編、後篇に分け投稿します。
BoxとAzure Logic Appsの連携
Logic Appsの「Boxコネクタ」を利用することでコードを記述することなく簡単に接続できます。手順は以下の通りです。
Boxアプリの設定
- Boxの開発者コンソールでカスタムアプリを作成
- OAuth 2.0認証を設定
Logic Apps上でBoxコネクタを作成
- Box上に保存してある複数のファイルを取得するために以下の選択を行う
- Boxアカウントで認証を行い、対象フォルダを指定
Boxで取得したファイルをAzure BlobStorageに保存
後続のAzure Functions内で加工するため、一度、AzureのBlob Storageに保存します。Blob Storageへの保存は、Blob Storageコネクタを使用します。
- 取得したBox内のファイルリストの結果を「For Each」アクションでループ
- ループ内で取得できるBoxのファイルパスから、Boxコネクタの「パスによるファイルコンテンツの取得」を使用してファイルを取得
- Blob Storageコネクタの「BLOBを作成する(V2)」を選択し、取得したファイルをBlob Storageに保存
Logic AppsからAzure Functionsを呼び出す (HTTPアクション)
取り込んだエクセルのデータを加工し、Databaseに保存するため、Azure Functionsを利用します。
Logic AppsからFunctionsを呼び出すには、HTTPアクションを使用します。
※Functionsアクションもありますが、従量課金プランではFunctionsアクションは使えなかったため、HTTPコネクタを使用してFunctionsを呼び出しています。
Functionsの作成
以下の定義を行うことで、HTTPリクエストをトリガーに関数を実行することが出来ます。
本関数内では、GETリクエストを受けたのち、Blob Storageのファイルをtmp領域に保存し、その後、データ加工処理を行っています。
import os
import sys
import shutil
import tempfile
import traceback
from pathlib import Path
from azure.storage.blob import BlobServiceClient
from azure.storage.blob import ContainerClient
import azure.functions as func
import logging
app = func.FunctionApp()
@app.function_name(name="MyHttpTrigger")
@app.route(route="hello")
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
# ファイル保存用のtmp領域
tempFilePath = tempfile.gettempdir()
# Azure Blob Storageからtmp領域へファイルを保存する
container_client = get_container_client()
download_blobs_in_directory(container_client, "some_dir", tempFilePath)
# データ加工処理
process_data()
return func.HttpResponse(f"data processing completed!")
Blob Storageからのファイル取得
FunctionsからBlob Storageに接続し、ファイルを取得する処理。
Blob Storageに接続するための接続文字列は、Azureポータル上から取得する必要があります。
def get_container_client() -> ContainerClient:
# 環境変数から接続文字列を取得
connection_string = os.getenv("AzureWebJobsStorage")
container_name = "excel-data"
# BlobServiceClientを作成
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
return container_client
def download_blobs_in_directory(container_client, directory_name, local_path):
""" blob storageからファイルを取得する """
blobs = container_client.list_blobs(name_starts_with=directory_name)
files = []
for blob in blobs:
blob_client = container_client.get_blob_client(blob)
download_file_path = os.path.join(local_path, blob.name)
files.append(download_file_path)
# ディレクトリを作成
os.makedirs(os.path.dirname(download_file_path), exist_ok=True)
# Blobをダウンロード
with open(download_file_path, "wb") as download_file:
download_file.write(blob_client.download_blob().readall())
return files
Logic AppsでのHTTPアクション設定
作成したFunctionsをデプロイ後、Logic AppsでHTTPアクションを追加します。
HTTPアクションの設定内容
- メソッド:
GET
- URI: FunctionsのエンドポイントURL(Azureのポータル画面から取得)
HTTPアクション使用時のタイムアウト回避方法 (Durable Functions)
通常は前述の定義方法でFunctionsにデプロイすれば、Functionsを呼び出せます。
ただし、HTTPアクションのタイムアウト時間は、マルチテナントで120秒であるため、処理時間の長い関数を実行する場合は、タイムアウトしてしまいます。
これを回避するためにDurable Functionsを利用しました。
Durable Functionsとは
Durable Functions は、サーバーレス コンピューティング環境でステートフル関数を記述できる Azure Functions の機能です。 この拡張機能では、Azure Functions プログラミング モデルを使用して、オーケストレーター関数を記述することでステートフル ワークフローを定義でき、エンティティ関数を記述することでステートフル エンティティを定義できます。 拡張機能によって状態、チェックポイント、再起動がバックグラウンドで管理されるため、ユーザーはビジネス ロジックに専念できます。
Durable Functionsを利用することで以下のメリットがあります。
- タイムアウト回避: データ量が多い場合、通常のHTTPトリガーでは10分以内に処理が終わらない可能性があるが、Durable Functionsなら長時間処理が可能
- 再試行性: データ加工中にエラーが発生しても、オーケストレーターが状態を保持しているため、再実行が簡単
- スケーラビリティ: サーバーレスで自動スケールするため、大量のデータを処理する際も対応可能
Durable Functionsの定義
Durable Functionsは以下の3つの主要な役割で構成されます
- オーケストレーター関数 (Orchestrator Function)
- ワークフローのロジックを定義する中心的な関数
- 他の関数(アクティビティ関数)を呼び出し、処理の流れを制御
- 例: 「ステップ1が終わったらステップ2を実行する」といったシーケンスを記述
- アクティビティ関数 (Activity Function)
- 実際の処理(例: データ加工、外部API呼び出し)を行う個別の関数
- オーケストレーターから呼び出され、結果を返す
- クライアント関数 (Client Function)
- ワークフローを開始するためのエントリーポイント
- HTTPトリガーやタイマーなどで起動し、オーケストレーターをキックオフ
まず、HTTPトリガーとなるクライアント関数を作成します。リクエストパラメータなどを処理しオーケストレーターを起動します。
import os
import sys
import shutil
import tempfile
import traceback
from pathlib import Path
from datetime import timedelta
import azure.functions as func
import azure.durable_functions as df
logger = setup_logging()
df_app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
# An HTTP-triggered function with a Durable Functions client binding
@df_app.route(route="orchestrators/{functionName}")
@df_app.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client: df.DurableOrchestrationClient):
logger.info('http_start')
try:
function_name = req.route_params.get('functionName')
input_data = dict(req.params)
if input_data:
logger.info(f"req.param: {input_data}")
instance_id = await client.start_new(function_name, None, input_data)
response = client.create_check_status_response(req, instance_id)
return response
except Exception as e:
logger.error("エラーが発生しました:\n%s", traceback.format_exc())
return func.HttpResponse(f"Error: {str(e)}. Method not allowed", status_code=405)
続いて、アクティビティ関数を呼び出すためオーケストレーター関数を定義します。
# Orchestrator
@df_app.orchestration_trigger(context_name="context")
def exec_orchestrator(context: df.DurableOrchestrationContext):
""" データ加工 & 取込処理呼出し """
result = yield context.call_activity("processing_data", None)
return result
最後に、実際のデータ加工を行うアクティビティ関数を定義します。前項のmain関数と処理の流れは変わっていませんが、付与しているアノテーションが異なります。
# Activity
@df_app.activity_trigger(input_name="args")
def processing_data(args: str):
""" データ加工 & 取込処理実行 """
logger.info('Python HTTP trigger function processed a request')
try:
# ファイル保存用のtmp領域
tempFilePath = tempfile.gettempdir()
# Azure Blob Storageからtmp領域へファイルを保存する
container_client = get_container_client()
download_blobs_in_directory(container_client, "some_dir", tempFilePath)
# データ加工処理
process_data()
return 'completed'
except Exception as e:
logger.error("エラーが発生しました:\n%s", traceback.format_exc())
return f"Error: {str(e)}"
Logic Appsの調整
Functionsを呼び出す、Logic AppsのHTTPコネクタの設定を変更します。
- HTTPアクションの「非同期パターン(Asynchronous pattern)」を有効化
- 必要に応じて「タイムアウト時間(Action timeout)」を指定(ここでは20分を指定)
- URIを変更(http_start関数にバインドしたエンドポイント)
後編につづく
ここまでで、BoxからExcelデータを取得し、Logic AppsでFunctionsを呼び出し、タイムアウトを回避する仕組みを構築しました。
後編では、SQL Databaseとの接続の接続方法について記述します。