1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Synapse AnalyticsのSpark Poolで1億行のデータをETL処理してみた!

Last updated at Posted at 2025-03-11

はじめに

Azure Synapse AnalyticsのSpark Poolを使ってETL処理を開発する機会があったので、備忘録として本記事を執筆しました。

シナリオ

データレイクに日次で連携される1億行のデータをSparkでETL処理してデータウェアハウスに連携します。データレイクの構成は以下のイメージで、本シナリオのETL処理対象のデータはmanufacture配下に格納されます。

#Azure Data Lake Storage Gen2

workspace/                    
└── file/                                 # バッチ用データ
    ├── manufacture/                      # 製造データ(今回のETL処理対象)
    │   ├── sendai/                       # 仙台工場
    │   │   ├── 20250101/                 # 2025年1月1日
    │   │   │   ├── manufacture_20250101_001.csv 
    │   │   │   ├── manufacture_20250101_002.csv  
    │   │   │   ├── manufacture_20250101_xxx.csv  
    │   │   ├── 20250102/                 # 2025年1月2日
    │   │   ├── ...                       # 他の日付
    │   ├── osaka/                        # 大阪工場
    │   ├── nagoya/                       # 名古屋工場
    └── sales/                            # 販売データ
    └── ...                              # その他データ

ソリューション構成

  • データレイク:Azure Data Lake Storage Gen2
  • データウェアハウス:Azure Synapse Analyticのデルタテーブル
  • パイプライン:Synapse Pipeline

image.png

事前準備

1. Spark Poolの作成

ETL処理を実行するSpark Poolを作成します。

今回は検証用のSpark Poolになるため最低限のスペックで作成します。

# 項目 設定値
1 Apache Sparkプール名 <任意の名前>
2 ノードサイズファミリ メモリ最適化済み
3 ノードサイズ Small(4仮想コア/32GB)
4 自動スケーリング 無効
5 ノード数 3
6 エグゼキューターを動的に割り当てる 無効

2. デルタテーブルの作成

Synapseのノートブックを新規作成してDeltaLake上にスキーマとテーブルを作成します。
ノートブックの使い方は下記リンクを参照してください。

まずはスキーマを作成します。

SQL
CREATE SCHEMA dwh;

次にシンクとなるデルタテーブルを作成します。

SQL
CREATE TABLE dwh.t_manufacture(
transaction_id STRING NOT NULL,
factory_id STRING NOT NULL,
product_id STRING NOT NULL,
quantity INT,
Unit STRING,
status STRING,
created_at timestamp,
batch_datetime timestamp NOT NULL,
etl_user STRING NOT NULL
)USING DELTA
PARTITIONED BY (factory_id);

スキーマとデルタテーブルを作成すると以下のようにレイクデータベースにdwh.t_manufactureが表示されます。

image.png

3. テストデータの作成

PySparkでテストデータを作成します。下記スクリプトを実行すると1億行のデータが作成されます。

PySpark
from pyspark.sql import Row
from pyspark.sql.functions import expr
import uuid
import random
from datetime import datetime, timedelta

num_records = 100000000  # 1億件
status_choices = ["in_progress", "completed", "failed"]
start_date = datetime(2025, 1, 1)

#Spark の range() を使って分散データを作成
rdd = spark.range(num_records).rdd.map(lambda i: Row(
    transaction_id=str(uuid.uuid4()),
    factory_id="Plant001",
    product_id="Product003",
    quantity=random.randint(1, 100),
    unit="g",
    status=random.choice(status_choices),
    created_at=start_date + timedelta(milliseconds=int(i.id)) 
))

#RDDからDataFrameを作成
df = spark.createDataFrame(rdd)

#DataFrameのカラム型を修正
df = df.withColumn("quantity", df["quantity"].cast("int")) \
       .withColumn("created_at", df["created_at"].cast("timestamp"))

作成したテストデータをcsv形式にしてデータレイクに格納します。仙台工場の2025年1月1日の製造データという想定で格納先のデータレイクのパスを指定します。

pyspark
adls_path = "abfss://xxx@xxx.dfs.core.windows.net/synapse/workspaces/file/manufacture/sendai/20250101/"

df.write.format("csv").option("sep", "\t").option("header","true").save(adls_path)
#テストデータ格納先

workspace/                    
└── file/                                 # バッチ用データ
    ├── manufacture/                      # 製造データ(今回のETL処理対象)
    │   ├── sendai/                       # 仙台工場
    │   │   ├── 20250101/                 # 2025年1月1日
    │   │   │   ├── manufacture_20250101_001.csv 
    │   │   │   ├── manufacture_20250101_002.csv  
    │   │   │   ├── manufacture_20250101_xxx.csv  

指定したデータレイクのパスにファイルが格納されました。
image.png

これで開発の準備が整いました。

開発✨

開発対象は4つです。

# 内容 説明
1 Apache Spark構成 SparkJobの制御およびSparkJob内で参照するPropertyを定義します
2 ETL処理スクリプト SparkJobで実行するスクリプトを開発します
3 Apache Sparkジョブ定義 SparkJobで実行するETL処理スクリプト、SparkPool、Apache Spark構成を紐づけます
4 Synapse Pipeline SparkJobを実行するパイプラインを開発します

1. Apache Spark構成

Apache Spark構成を作成します。Sparkの制御に使うPropertyとSparkjobで参照するPropertyを定義できます。例えば以下のような使い方ができます。

Property
Sparkの制御に使うProperty spark.executor.maxNumFailuresを定義し、Executorで許容する失敗の数を制御する
SparkJobで参照するProperty データレイクのパスをspark.AdlsBasePathとして定義し、SparkJob内でspark.AdlsBasePathの値を変数に設定してファイルを読み取る

今回は4つのPropertyを設定します。

# Property 設定値 目的 備考
1 spark.microsoft.delta.optimizeWrite.enabled true 制御  デルタテーブルの書き込みパフォーマンス向上
2 spark.adlsBasePath <パス> 参照 データレイクのベースとなるパス
3 spark.manufacture manufacture 参照 データレイクのフォルダ名
4 spark.etlUser Bython315 参照 デルタテーブルのデータ項目の固定値

2. ETL処理スクリプト

ETL処理スクリプトはPySparkを用いて開発します。シンプルに抽出・変換・ロードの各処理を関数として開発し、main関数で各関数を呼び出し実行します。

IngestManufactureData.py
import sys
import logging
from datetime import datetime
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_timestamp, lit, to_utc_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

#ログ出力の設定
def setup_logging() -> None:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )


#SparkSessionの定義
def get_spark_session() -> SparkSession:
    spark = SparkSession.builder.appName("Etl_Manufacture").getOrCreate()
    return spark


#データレイクからデータを読み取る
def extract_manufacture_data(spark: SparkSession, adls_file_path: str) -> DataFrame:
    schema = StructType([
        StructField("transaction_id", StringType()),
        StructField("factory_id", StringType()), 
        StructField("product_id", StringType()),
        StructField("quantity", IntegerType()),
        StructField("Unit", StringType()),
        StructField("status", StringType()),
        StructField("created_at", TimestampType())
    ])

    df_extracted = spark.read.format("csv").option("header",True).option("sep", "\t").schema(schema).load(adls_file_path)
    return df_extracted


#読み取ったデータを変換する(今回はカラムの追加のみ)
def transform_manufacture_data(df_extracted: DataFrame, etl_user: str) -> DataFrame:
    df_transformed = df_extracted.withColumn("batch_datetime", to_utc_timestamp(current_timestamp(), "UTC"))
    df_transformed = df_transformed.withColumn("etl_user", lit(etl_user))
    return df_transformed


#変換後のデータをデータウェアハウスに格納する(今回はAppendモードで要件が満たされているとする)
def load_data_to_delta(df_transformed: DataFrame) -> None:
    df_transformed.write.format("delta").mode("append").saveAsTable("dwh.t_manufacture")


def main(if_location: str, if_date: str) ->None:
    try:
        setup_logging()
        spark = get_spark_session()

        #Apache Spark構成のPropertyを参照して変数に使用する
        adls_base_path = spark.conf.get('spark.adlsBasePath')
        if_name = spark.conf.get('spark.manufacture')
        etl_user = spark.conf.get('spark.etlUser')

        #データ抽出元のデータレイクパスを定義
        adls_file_path = f"{adls_base_path}{if_name}/{if_location}/{if_date}"

        #データ抽出
        df_manufacture_extracted = extract_manufacture_data(spark, adls_file_path)
    
        #データ変換
        df_transformed = transform_manufacture_data(df_manufacture_extracted, etl_user)

        #データロード
        load_data_to_delta(df_transformed)

    except Exception as e:
        logging.error(f"Error in etl process: {e}")
        raise


if __name__ == '__main__':
    
    #Synaspeパイプラインから引数を受け取る
    if_location = sys.argv[1]
    if_date = sys.argv[2]
    
    #main関数の実行
    main(if_location, if_date)

後述のSynapse PipelineからETL処理スクリプトを実行する時にif_locationif_dateを引数として渡すことで、抽出対象のデータレイクのパスを動的に指定できるようにしています。

また今回はETL処理スクリプト内で抽出・変換・ロードの各関数を定義していますが、開発規模が大きい場合、各処理をカスタムモジュール(例: extract.py、 transform.py、 load.py)として再利用しやすい形にするのが望ましいと思います。コードの可読性、保守性、テストのしやすさが向上します。

開発したETL処理スクリプトはADLS Gen2(Azure Data Lake Storage Gen2)に格納します。

#Azure Data Lake Storage Gen2

workspace/                            
└── batchjobs/                         # バッチ処理
    └── IngestManufactureData/         # 製造データのETL処理
        └── IngestManufactureData.py   # 実行スクリプト

ADLS Gen2に格納したETL処理スクリプトを後ほどの手順で開発するSynapse Pipelineで実行します。

3. Apache Sparkジョブ定義

Apache Sparkジョブ定義は、Synapse PipelineのSparkジョブ定義アクティビティにおいて既定で実行するSparkJob、使用するSparkPool、Apache Spark構成をパッケージ化する役割を担います。

Apache Sparkジョブ定義の設定は以下の通りです。

# 項目 設定値 備考
1 メイン定義ファイル <ADLS Gen2に格納したETL処理スクリプトのパス> abfss://形式のパスを指定する必要があります(※)
2 Apache Sparkプール <作成したSpark Pool名>
3 Executorのサイズ Small(4仮想コア/28GB)
4 エグゼキューターを動的に割り当てる 無効
5 Executor 2
6 Apache Spark構成 <Apache Spark構成名>

※adfss://形式はADLS Gen2のURIスキームです。Synapse、Databricks、Fabric等のデータ分析ソリューションを使用する場合はADFSSでパスを指定する必要があります。

4. Synapse Pipeline

最後にSynapse Pipelineを開発しましょう。Synapse PipelineのSparkジョブ定義アクティビティを使用します。Sparkジョブ定義アクティビティによってADLS Gen2に格納したETL処理スクリプトを実行することができます。

アクティビティタブ > Synapse > Sparkジョブ定義をドラッグ&ドロップでキャンバスに配置します。

image.png

パイプラインパラメーターを追加します。どちらもETL処理スクリプトに渡す引数となります。パイプラインパラメータはトリガー実行時に外部から指定することができます。

image.png

次にSparkジョブ定義アクティビティをクリックし、設定タブで設定を選択します。Sparkジョブ定義は作成済みのApache Sparkジョブ定義を選択します。コマンドライン引数には、直前に追加したif_locationif_dateを指定します。

image.png

最後にパイプラインを発行して完了です。

パイプラインを実行してみる

開発したSynapse Pipelineのトリガーの追加 > 今すぐトリガーを選択します。今すぐトリガーを選択すると実行画面が表示されるので、パラメータを以下に設定してOKを押します。

image.png

トリガー実行時に指定したパラメータが引数としてETL処理スクリプトに渡されます。ETL処理における抽出対象のデータレイクのパスは以下となり、仙台工場の2025年1月1日のデータが対象となります。

#Azure Data Lake Storage Gen2

workspace/                    
└── file/                                 # バッチ用データ
    ├── manufacture/                      # 製造データ(今回のETL処理対象)
    │   ├── sendai/                       # 仙台工場
    │   │   ├── 20250101/                 # 2025年1月1日
    │   │   │   ├── manufacture_20250101_001.csv 
    │   │   │   ├── manufacture_20250101_002.csv  
    │   │   │   ├── manufacture_20250101_xxx.csv  


パイプラインの実行が成功しました。約7分で完了しました。

image.png


データを確認すると1億行格納されていました✨

image.png


ETL処理後のdwh.t_manufactureテーブルです。etl_userはApache Spark構成のPropertyから取得できていますね。

image.png

最後に

はじめてのQiita投稿でしたが執筆に8時間もかかるとは思いませんでした(笑)
最後までお読みいただきありがとうございました。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?