はじめに
Azure Synapse AnalyticsのSpark Poolを使ってETL処理を開発する機会があったので、備忘録として本記事を執筆しました。
シナリオ
データレイクに日次で連携される1億行のデータをSparkでETL処理してデータウェアハウスに連携します。データレイクの構成は以下のイメージで、本シナリオのETL処理対象のデータはmanufacture
配下に格納されます。
#Azure Data Lake Storage Gen2
workspace/
└── file/ # バッチ用データ
├── manufacture/ # 製造データ(今回のETL処理対象)
│ ├── sendai/ # 仙台工場
│ │ ├── 20250101/ # 2025年1月1日
│ │ │ ├── manufacture_20250101_001.csv
│ │ │ ├── manufacture_20250101_002.csv
│ │ │ ├── manufacture_20250101_xxx.csv
│ │ ├── 20250102/ # 2025年1月2日
│ │ ├── ... # 他の日付
│ ├── osaka/ # 大阪工場
│ ├── nagoya/ # 名古屋工場
└── sales/ # 販売データ
└── ... # その他データ
ソリューション構成
- データレイク:Azure Data Lake Storage Gen2
- データウェアハウス:Azure Synapse Analyticのデルタテーブル
- パイプライン:Synapse Pipeline
事前準備
1. Spark Poolの作成
ETL処理を実行するSpark Poolを作成します。
今回は検証用のSpark Poolになるため最低限のスペックで作成します。
# | 項目 | 設定値 |
---|---|---|
1 | Apache Sparkプール名 | <任意の名前> |
2 | ノードサイズファミリ | メモリ最適化済み |
3 | ノードサイズ | Small(4仮想コア/32GB) |
4 | 自動スケーリング | 無効 |
5 | ノード数 | 3 |
6 | エグゼキューターを動的に割り当てる | 無効 |
2. デルタテーブルの作成
Synapseのノートブックを新規作成してDeltaLake上にスキーマとテーブルを作成します。
ノートブックの使い方は下記リンクを参照してください。
まずはスキーマを作成します。
CREATE SCHEMA dwh;
次にシンクとなるデルタテーブルを作成します。
CREATE TABLE dwh.t_manufacture(
transaction_id STRING NOT NULL,
factory_id STRING NOT NULL,
product_id STRING NOT NULL,
quantity INT,
Unit STRING,
status STRING,
created_at timestamp,
batch_datetime timestamp NOT NULL,
etl_user STRING NOT NULL
)USING DELTA
PARTITIONED BY (factory_id);
スキーマとデルタテーブルを作成すると以下のようにレイクデータベースにdwh.t_manufacture
が表示されます。
3. テストデータの作成
PySparkでテストデータを作成します。下記スクリプトを実行すると1億行のデータが作成されます。
from pyspark.sql import Row
from pyspark.sql.functions import expr
import uuid
import random
from datetime import datetime, timedelta
num_records = 100000000 # 1億件
status_choices = ["in_progress", "completed", "failed"]
start_date = datetime(2025, 1, 1)
#Spark の range() を使って分散データを作成
rdd = spark.range(num_records).rdd.map(lambda i: Row(
transaction_id=str(uuid.uuid4()),
factory_id="Plant001",
product_id="Product003",
quantity=random.randint(1, 100),
unit="g",
status=random.choice(status_choices),
created_at=start_date + timedelta(milliseconds=int(i.id))
))
#RDDからDataFrameを作成
df = spark.createDataFrame(rdd)
#DataFrameのカラム型を修正
df = df.withColumn("quantity", df["quantity"].cast("int")) \
.withColumn("created_at", df["created_at"].cast("timestamp"))
作成したテストデータをcsv形式にしてデータレイクに格納します。仙台工場の2025年1月1日の製造データという想定で格納先のデータレイクのパスを指定します。
adls_path = "abfss://xxx@xxx.dfs.core.windows.net/synapse/workspaces/file/manufacture/sendai/20250101/"
df.write.format("csv").option("sep", "\t").option("header","true").save(adls_path)
#テストデータ格納先
workspace/
└── file/ # バッチ用データ
├── manufacture/ # 製造データ(今回のETL処理対象)
│ ├── sendai/ # 仙台工場
│ │ ├── 20250101/ # 2025年1月1日
│ │ │ ├── manufacture_20250101_001.csv
│ │ │ ├── manufacture_20250101_002.csv
│ │ │ ├── manufacture_20250101_xxx.csv
これで開発の準備が整いました。
開発✨
開発対象は4つです。
# | 内容 | 説明 |
---|---|---|
1 | Apache Spark構成 | SparkJobの制御およびSparkJob内で参照するPropertyを定義します |
2 | ETL処理スクリプト | SparkJobで実行するスクリプトを開発します |
3 | Apache Sparkジョブ定義 | SparkJobで実行するETL処理スクリプト、SparkPool、Apache Spark構成を紐づけます |
4 | Synapse Pipeline | SparkJobを実行するパイプラインを開発します |
1. Apache Spark構成
Apache Spark構成を作成します。Sparkの制御に使うPropertyとSparkjobで参照するPropertyを定義できます。例えば以下のような使い方ができます。
Property | 例 |
---|---|
Sparkの制御に使うProperty | spark.executor.maxNumFailuresを定義し、Executorで許容する失敗の数を制御する |
SparkJobで参照するProperty | データレイクのパスをspark.AdlsBasePathとして定義し、SparkJob内でspark.AdlsBasePathの値を変数に設定してファイルを読み取る |
今回は4つのPropertyを設定します。
# | Property | 設定値 | 目的 | 備考 |
---|---|---|---|---|
1 | spark.microsoft.delta.optimizeWrite.enabled | true | 制御 | デルタテーブルの書き込みパフォーマンス向上 |
2 | spark.adlsBasePath | <パス> | 参照 | データレイクのベースとなるパス |
3 | spark.manufacture | manufacture | 参照 | データレイクのフォルダ名 |
4 | spark.etlUser | Bython315 | 参照 | デルタテーブルのデータ項目の固定値 |
2. ETL処理スクリプト
ETL処理スクリプトはPySparkを用いて開発します。シンプルに抽出・変換・ロードの各処理を関数として開発し、main関数で各関数を呼び出し実行します。
import sys
import logging
from datetime import datetime
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_timestamp, lit, to_utc_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
#ログ出力の設定
def setup_logging() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
#SparkSessionの定義
def get_spark_session() -> SparkSession:
spark = SparkSession.builder.appName("Etl_Manufacture").getOrCreate()
return spark
#データレイクからデータを読み取る
def extract_manufacture_data(spark: SparkSession, adls_file_path: str) -> DataFrame:
schema = StructType([
StructField("transaction_id", StringType()),
StructField("factory_id", StringType()),
StructField("product_id", StringType()),
StructField("quantity", IntegerType()),
StructField("Unit", StringType()),
StructField("status", StringType()),
StructField("created_at", TimestampType())
])
df_extracted = spark.read.format("csv").option("header",True).option("sep", "\t").schema(schema).load(adls_file_path)
return df_extracted
#読み取ったデータを変換する(今回はカラムの追加のみ)
def transform_manufacture_data(df_extracted: DataFrame, etl_user: str) -> DataFrame:
df_transformed = df_extracted.withColumn("batch_datetime", to_utc_timestamp(current_timestamp(), "UTC"))
df_transformed = df_transformed.withColumn("etl_user", lit(etl_user))
return df_transformed
#変換後のデータをデータウェアハウスに格納する(今回はAppendモードで要件が満たされているとする)
def load_data_to_delta(df_transformed: DataFrame) -> None:
df_transformed.write.format("delta").mode("append").saveAsTable("dwh.t_manufacture")
def main(if_location: str, if_date: str) ->None:
try:
setup_logging()
spark = get_spark_session()
#Apache Spark構成のPropertyを参照して変数に使用する
adls_base_path = spark.conf.get('spark.adlsBasePath')
if_name = spark.conf.get('spark.manufacture')
etl_user = spark.conf.get('spark.etlUser')
#データ抽出元のデータレイクパスを定義
adls_file_path = f"{adls_base_path}{if_name}/{if_location}/{if_date}"
#データ抽出
df_manufacture_extracted = extract_manufacture_data(spark, adls_file_path)
#データ変換
df_transformed = transform_manufacture_data(df_manufacture_extracted, etl_user)
#データロード
load_data_to_delta(df_transformed)
except Exception as e:
logging.error(f"Error in etl process: {e}")
raise
if __name__ == '__main__':
#Synaspeパイプラインから引数を受け取る
if_location = sys.argv[1]
if_date = sys.argv[2]
#main関数の実行
main(if_location, if_date)
後述のSynapse PipelineからETL処理スクリプトを実行する時にif_location
とif_date
を引数として渡すことで、抽出対象のデータレイクのパスを動的に指定できるようにしています。
また今回はETL処理スクリプト内で抽出・変換・ロードの各関数を定義していますが、開発規模が大きい場合、各処理をカスタムモジュール(例: extract.py、 transform.py、 load.py)として再利用しやすい形にするのが望ましいと思います。コードの可読性、保守性、テストのしやすさが向上します。
開発したETL処理スクリプトはADLS Gen2(Azure Data Lake Storage Gen2)に格納します。
#Azure Data Lake Storage Gen2
workspace/
└── batchjobs/ # バッチ処理
└── IngestManufactureData/ # 製造データのETL処理
└── IngestManufactureData.py # 実行スクリプト
ADLS Gen2に格納したETL処理スクリプトを後ほどの手順で開発するSynapse Pipelineで実行します。
3. Apache Sparkジョブ定義
Apache Sparkジョブ定義は、Synapse PipelineのSparkジョブ定義アクティビティにおいて既定で実行するSparkJob、使用するSparkPool、Apache Spark構成をパッケージ化する役割を担います。
Apache Sparkジョブ定義の設定は以下の通りです。
# | 項目 | 設定値 | 備考 |
---|---|---|---|
1 | メイン定義ファイル | <ADLS Gen2に格納したETL処理スクリプトのパス> | abfss://形式のパスを指定する必要があります(※) |
2 | Apache Sparkプール | <作成したSpark Pool名> | |
3 | Executorのサイズ | Small(4仮想コア/28GB) | |
4 | エグゼキューターを動的に割り当てる | 無効 | |
5 | Executor | 2 | |
6 | Apache Spark構成 | <Apache Spark構成名> |
※adfss://形式はADLS Gen2のURIスキームです。Synapse、Databricks、Fabric等のデータ分析ソリューションを使用する場合はADFSSでパスを指定する必要があります。
4. Synapse Pipeline
最後にSynapse Pipelineを開発しましょう。Synapse PipelineのSparkジョブ定義アクティビティを使用します。Sparkジョブ定義アクティビティによってADLS Gen2に格納したETL処理スクリプトを実行することができます。
アクティビティタブ > Synapse > Sparkジョブ定義をドラッグ&ドロップでキャンバスに配置します。
パイプラインパラメーターを追加します。どちらもETL処理スクリプトに渡す引数となります。パイプラインパラメータはトリガー実行時に外部から指定することができます。
次にSparkジョブ定義アクティビティをクリックし、設定タブで設定を選択します。Sparkジョブ定義は作成済みのApache Sparkジョブ定義を選択します。コマンドライン引数には、直前に追加したif_location
とif_date
を指定します。
最後にパイプラインを発行して完了です。
パイプラインを実行してみる
開発したSynapse Pipelineのトリガーの追加 > 今すぐトリガーを選択します。今すぐトリガーを選択すると実行画面が表示されるので、パラメータを以下に設定してOKを押します。
トリガー実行時に指定したパラメータが引数としてETL処理スクリプトに渡されます。ETL処理における抽出対象のデータレイクのパスは以下となり、仙台工場の2025年1月1日のデータが対象となります。
#Azure Data Lake Storage Gen2
workspace/
└── file/ # バッチ用データ
├── manufacture/ # 製造データ(今回のETL処理対象)
│ ├── sendai/ # 仙台工場
│ │ ├── 20250101/ # 2025年1月1日
│ │ │ ├── manufacture_20250101_001.csv
│ │ │ ├── manufacture_20250101_002.csv
│ │ │ ├── manufacture_20250101_xxx.csv
パイプラインの実行が成功しました。約7分で完了しました。
ETL処理後のdwh.t_manufacture
テーブルです。etl_user
はApache Spark構成のPropertyから取得できていますね。
最後に
はじめてのQiita投稿でしたが執筆に8時間もかかるとは思いませんでした(笑)
最後までお読みいただきありがとうございました。