1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azureを活用したExcelデータ活用基盤の構築(前編)

Last updated at Posted at 2025-03-31

はじめに

Excelで管理しているデータをAzure基盤上に移行し、データ活用基盤を構築する取り組みを行いました。

具体的には、Boxに保存されているExcelファイルをAzureに連携、Azure Functionsでデータ加工を行い、SQL Databaseに保存するフローをAzure Logic Appsで構築。SQL Databaseに蓄積したデータを、最終的にPowerBIで可視化する仕組みを構築しました。

このシステムを構築するうえでポイントとなった技術について前編、後篇に分け投稿します。

BoxとAzure Logic Appsの連携

Logic Appsの「Boxコネクタ」を利用することでコードを記述することなく簡単に接続できます。手順は以下の通りです。

Boxアプリの設定

  • Boxの開発者コンソールでカスタムアプリを作成
  • OAuth 2.0認証を設定

Logic Apps上でBoxコネクタを作成

  • Box上に保存してある複数のファイルを取得するために以下の選択を行う
    • Logic Apps上で、「アクションの追加」> Boxコネクタの「フォルダー内のファイルとフォルダーのリスト」を選択
      image.png
  • Boxアカウントで認証を行い、対象フォルダを指定

Boxで取得したファイルをAzure BlobStorageに保存

後続のAzure Functions内で加工するため、一度、AzureのBlob Storageに保存します。Blob Storageへの保存は、Blob Storageコネクタを使用します。

  • 取得したBox内のファイルリストの結果を「For Each」アクションでループ
  • ループ内で取得できるBoxのファイルパスから、Boxコネクタの「パスによるファイルコンテンツの取得」を使用してファイルを取得
  • Blob Storageコネクタの「BLOBを作成する(V2)」を選択し、取得したファイルをBlob Storageに保存
    image.png

上記の手順で以下の様なフローが作成できます。
image.png

Logic AppsからAzure Functionsを呼び出す (HTTPアクション)

取り込んだエクセルのデータを加工し、Databaseに保存するため、Azure Functionsを利用します。

Logic AppsからFunctionsを呼び出すには、HTTPアクションを使用します。
※Functionsアクションもありますが、従量課金プランではFunctionsアクションは使えなかったため、HTTPコネクタを使用してFunctionsを呼び出しています。

Functionsの作成

以下の定義を行うことで、HTTPリクエストをトリガーに関数を実行することが出来ます。
本関数内では、GETリクエストを受けたのち、Blob Storageのファイルをtmp領域に保存し、その後、データ加工処理を行っています。

    import os
    import sys
    import shutil
    import tempfile
    import traceback
    from pathlib import Path
    
    from azure.storage.blob import BlobServiceClient
    from azure.storage.blob import ContainerClient

    import azure.functions as func
    import logging
    
    app = func.FunctionApp()
    
    @app.function_name(name="MyHttpTrigger")
    @app.route(route="hello")
    def main(req: func.HttpRequest) -> func.HttpResponse:
        logging.info('Python HTTP trigger function processed a request.')

        # ファイル保存用のtmp領域
        tempFilePath = tempfile.gettempdir()

        # Azure Blob Storageからtmp領域へファイルを保存する
        container_client = get_container_client()
        download_blobs_in_directory(container_client, "some_dir", tempFilePath)
    
        # データ加工処理
        process_data()
    
        return func.HttpResponse(f"data processing completed!")

Blob Storageからのファイル取得

FunctionsからBlob Storageに接続し、ファイルを取得する処理。
Blob Storageに接続するための接続文字列は、Azureポータル上から取得する必要があります。

    def get_container_client() -> ContainerClient:
        # 環境変数から接続文字列を取得
        connection_string = os.getenv("AzureWebJobsStorage")
        container_name = "excel-data"
    
        # BlobServiceClientを作成
        blob_service_client = BlobServiceClient.from_connection_string(connection_string)
        container_client = blob_service_client.get_container_client(container_name)
        return container_client
    
    
    def download_blobs_in_directory(container_client, directory_name, local_path):
        """ blob storageからファイルを取得する """
        blobs = container_client.list_blobs(name_starts_with=directory_name)
        files = []
        for blob in blobs:
            blob_client = container_client.get_blob_client(blob)
            download_file_path = os.path.join(local_path, blob.name)
            files.append(download_file_path)
            # ディレクトリを作成
            os.makedirs(os.path.dirname(download_file_path), exist_ok=True)
    
            # Blobをダウンロード
            with open(download_file_path, "wb") as download_file:
                download_file.write(blob_client.download_blob().readall())
    
        return files
    

Logic AppsでのHTTPアクション設定

作成したFunctionsをデプロイ後、Logic AppsでHTTPアクションを追加します。

HTTPアクションの設定内容

  • メソッド: GET
  • URI: FunctionsのエンドポイントURL(Azureのポータル画面から取得)

HTTPアクション使用時のタイムアウト回避方法 (Durable Functions)

通常は前述の定義方法でFunctionsにデプロイすれば、Functionsを呼び出せます。

ただし、HTTPアクションのタイムアウト時間は、マルチテナントで120秒であるため、処理時間の長い関数を実行する場合は、タイムアウトしてしまいます。

これを回避するためにDurable Functionsを利用しました。

Durable Functionsとは

Durable Functions は、サーバーレス コンピューティング環境でステートフル関数を記述できる Azure Functions の機能です。 この拡張機能では、Azure Functions プログラミング モデルを使用して、オーケストレーター関数を記述することでステートフル ワークフローを定義でき、エンティティ関数を記述することでステートフル エンティティを定義できます。 拡張機能によって状態、チェックポイント、再起動がバックグラウンドで管理されるため、ユーザーはビジネス ロジックに専念できます。

Durable Functionsを利用することで以下のメリットがあります。

  • タイムアウト回避: データ量が多い場合、通常のHTTPトリガーでは10分以内に処理が終わらない可能性があるが、Durable Functionsなら長時間処理が可能
  • 再試行性: データ加工中にエラーが発生しても、オーケストレーターが状態を保持しているため、再実行が簡単
  • スケーラビリティ: サーバーレスで自動スケールするため、大量のデータを処理する際も対応可能

Durable Functionsの定義

Durable Functionsは以下の3つの主要な役割で構成されます

  • オーケストレーター関数 (Orchestrator Function)
    • ワークフローのロジックを定義する中心的な関数
    • 他の関数(アクティビティ関数)を呼び出し、処理の流れを制御
    • 例: 「ステップ1が終わったらステップ2を実行する」といったシーケンスを記述
  • アクティビティ関数 (Activity Function)
    • 実際の処理(例: データ加工、外部API呼び出し)を行う個別の関数
    • オーケストレーターから呼び出され、結果を返す
  • クライアント関数 (Client Function)
    • ワークフローを開始するためのエントリーポイント
    • HTTPトリガーやタイマーなどで起動し、オーケストレーターをキックオフ

まず、HTTPトリガーとなるクライアント関数を作成します。リクエストパラメータなどを処理しオーケストレーターを起動します。

    import os
    import sys
    import shutil
    import tempfile
    import traceback
    from pathlib import Path
    from datetime import timedelta
        
    import azure.functions as func
    import azure.durable_functions as df
    
   
    logger = setup_logging()
    
    df_app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
    
    
    # An HTTP-triggered function with a Durable Functions client binding
    @df_app.route(route="orchestrators/{functionName}")
    @df_app.durable_client_input(client_name="client")
    async def http_start(req: func.HttpRequest, client: df.DurableOrchestrationClient):
        logger.info('http_start')
        try:
            function_name = req.route_params.get('functionName')
    
            input_data = dict(req.params)
            if input_data:
                logger.info(f"req.param: {input_data}")
    
            instance_id = await client.start_new(function_name, None, input_data)
            response = client.create_check_status_response(req, instance_id)
            return response
        except Exception as e:
            logger.error("エラーが発生しました:\n%s", traceback.format_exc())
            return func.HttpResponse(f"Error: {str(e)}. Method not allowed", status_code=405)

続いて、アクティビティ関数を呼び出すためオーケストレーター関数を定義します。

    # Orchestrator
    @df_app.orchestration_trigger(context_name="context")
    def exec_orchestrator(context: df.DurableOrchestrationContext):
        """ データ加工 & 取込処理呼出し """
        result = yield context.call_activity("processing_data", None)
        return result
    

最後に、実際のデータ加工を行うアクティビティ関数を定義します。前項のmain関数と処理の流れは変わっていませんが、付与しているアノテーションが異なります。

    # Activity
    @df_app.activity_trigger(input_name="args")
    def processing_data(args: str):
        """ データ加工 & 取込処理実行 """
        logger.info('Python HTTP trigger function processed a request')
    
        try:
            # ファイル保存用のtmp領域
            tempFilePath = tempfile.gettempdir()
    
            # Azure Blob Storageからtmp領域へファイルを保存する
            container_client = get_container_client()
            download_blobs_in_directory(container_client, "some_dir", tempFilePath)
        
            # データ加工処理
            process_data()
   
            return 'completed'
        except Exception as e:
            logger.error("エラーが発生しました:\n%s", traceback.format_exc())
            return f"Error: {str(e)}"

Logic Appsの調整

Functionsを呼び出す、Logic AppsのHTTPコネクタの設定を変更します。

  • HTTPアクションの「非同期パターン(Asynchronous pattern)」を有効化
  • 必要に応じて「タイムアウト時間(Action timeout)」を指定(ここでは20分を指定)
  • URIを変更(http_start関数にバインドしたエンドポイント)
    • https://xxxxx.azurewebsites.net/api/orchestrators/exec_orchestrator
    • 設定例
      image.png

後編につづく

ここまでで、BoxからExcelデータを取得し、Logic AppsでFunctionsを呼び出し、タイムアウトを回避する仕組みを構築しました。
後編では、SQL Databaseとの接続の接続方法について記述します。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?