0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【第5回】Glue × PySparkでETLパイプラインを構築(データ加工)

Last updated at Posted at 2025-05-05

🎮 はじめに

これまで、

  • スマブラ対戦データ
  • 気象データ

をローカルから収集し、AWS S3に保存してきました。
今回はいよいよ、AWS GlueとPySparkを使って、
これらのデータを結合・加工するETLパイプラインを構築していきます!

これにより、最終的に

  • キャラクターごとの勝率
  • キャラ間の相性
  • 天候や時間帯ごとの勝率

を分析できるデータが完成します!


🛠 使用技術

  • AWS Glue
  • PySpark(Glueジョブ内で使用)
  • AWS S3(データレイク・データマート)
  • AWS IAM(Glue用ロール設定)

🧱 ETLパイプライン全体像

設計図_5.png

今まで、上記図のローカル環境の部分の開発を進めてきました。
今回は赤線で囲った、データレイク → データウェアハウス → データマート間のETL処理を作っていきます。

データの流れとしては、Crawlerを使ってS3のデータを収集しData Catalogに登録、Data Catalogのデータを元にETLジョブ(pyspark)でデータを加工し、後続のS3に保存します。
この流れを図のように繰り返します。


🛠 実装

Crawler、Data Catalog、ETLジョブ(pyspark)の解説を進めていきます。


①Crawlerの作成

Crawler(クローラー)は、S3のデータソースをスキャンして、スキーマを推測し、Glue Data Catalogにテーブルを作成してくれるサービスです。

右上のCreate crawlerから作成することができます!

クローラー.png

②Data Catalog

Glue Data Catalogは、AWSにおける「データのメタデータ(データの説明書き)」を管理するサービスです。

ざっくりいうと、

  • どこに(S3、RDSなど)
  • どんな形式で(CSV、Parquet、JSONなど)
  • どんなカラムが(カラム名・型) 保存されているか?を登録・管理するための台帳・辞書です。

テーブルが増えてスキーマが増えてくると、どのスキーマが何を表しているか分からなくなってきます。
そういった悩みを解決してくれるのがデータカタログです!

catalog.png

③【ETLジョブ】データレイク → データウェアハウス の処理

テーブル定義 ①.png

次にデータレイク → データウェアハウス間の処理を書いていきます。
ここでは、上記図赤線の日付を共通の形式(yyyy/mm/dd hh:mm)に変換していきます。

実際のコードは以下になります。

from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import udf, col, date_format
from pyspark.sql.types import StringType
from datetime import datetime, timedelta
from awsglue.dynamicframe import DynamicFrame
import sys

# UDF (User Defined Function) の定義
def adjust_datetime(date, time):
    if time == 24:
        new_time = "00:00"
        new_date = datetime.strptime(date, "%Y-%m-%d") + timedelta(days=1)
        new_date = new_date.strftime("%Y-%m-%d")
    else:
        new_time = f"{int(time):02}:00"  # 時間を2桁の形式に固定し、分を追加
        new_date = date
    return f"{new_date} {new_time}"

# UDFの登録
adjust_datetime_udf = udf(adjust_datetime, StringType())

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)

# データカタログからデータフレームを読み込む(ここは実際にデータカタログにある名前を指定)
dyf_django = glueContext.create_dynamic_frame.from_catalog(database="smash_datalake", table_name="smash_datalakedjango_data_export_csv")
dyf_weather = glueContext.create_dynamic_frame.from_catalog(database="smash_datalake", table_name="smash_datalakepreterite_weather_export_csv")

df_django = dyf_django.toDF()
df_weather = dyf_weather.toDF()

# Djangoデータの変換: match_timeのフォーマット変更
df_django_transformed = df_django.withColumn("match_time", date_format("match_time", "yyyy-MM-dd HH:mm"))

# Weatherデータの変換: dateとtimeの結合とフォーマット変更、元のdateとtimeカラムを削除
df_weather_transformed = df_weather.withColumn("datetime", adjust_datetime_udf(col("date"), col("time"))) \
    .drop("date") \
    .drop("time")

# datetimeカラムを最前列に移動
column_order = ["datetime"] + [col_name for col_name in df_weather_transformed.columns if col_name != "datetime"]
df_weather_transformed = df_weather_transformed.select(*column_order)

# S3(データウェアハウス)に保存
df_django_transformed.write.mode('overwrite').option("header", "true").csv("s3://your-bucket-name/datawarehouse/GameResults/")
df_weather_transformed.write.mode('overwrite').option("header", "true").csv("s3://your-bucket-name/datawarehouse/Weather/")

sc.stop()

全体の流れとしては、
1. データカタログのデータを、データフレームに読み込み
2. フォーマットを変更(Weatherデータは日付と時間が分かれているのでマージする)
3. S3(データウェアハウス)に保存

となります!

④【ETLジョブ】データウェアハウス → データマート の処理

次にデータレイク → データウェアハウス間の処理を書いていきます。
ここでは、以下の3つのデータマートテーブルを作成しています。

  • CharacterPerformance(個々のキャラクターのパフォーマンス)
  • CharacterMatchupPerformance(二つのキャラクター間のマッチアップのパフォーマンス)
  • WeatherImpactOnWinRates(気象情報と勝率の関係を確認)

実際のコードは以下になります。

import sys
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import col, count, sum, when, concat, lit, avg, date_format, hour, coalesce, first
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# データカタログからテーブルを読み込む
dyf_game_results = glueContext.create_dynamic_frame.from_catalog(
    database="smash_data_warehouse",
    table_name="gameresults"
)
dyf_character_master = glueContext.create_dynamic_frame.from_catalog(
    database="smash_data_warehouse",
    table_name="charactermaster"
)
dyf_weather = glueContext.create_dynamic_frame.from_catalog(
    database="smash_data_warehouse",
    table_name="weather"
)

# DataFrameに変換
df_game_results = dyf_game_results.toDF()
df_character_master = dyf_character_master.toDF()
df_weather = dyf_weather.toDF()
###############CharacterPerformanceテーブルの作成###############
df_performance = df_game_results.join(df_character_master, df_game_results.my_character_id == df_character_master.character_id)
df_performance = df_performance.groupBy("character_id", "character_name").agg(
    count("*").alias("matches_played"),
    sum(when(col("win_or_loss") == "win", 1).otherwise(0)).alias("wins"),
    sum(when(col("win_or_loss") == "loss", 1).otherwise(0)).alias("losses")
)
df_performance = df_performance.withColumn("win_rate", col("wins") / col("matches_played"))
df_performance = df_performance.select(
    "character_id", 
    "character_name", 
    "matches_played", 
    "wins", 
    "losses", 
    "win_rate"
)
output_path = "s3://your-bucket-name/datamart/CharacterPerformance"
df_performance.write.mode("overwrite").option("header", "true").csv(output_path)
###############CharacterMatchupPerformanceテーブルの作成###############
# DataFrameを使ったジョイン
df_joined = df_game_results.alias("results")
df_characters = df_character_master.alias("characters")

# 自キャラクターの名前を結合
df_joined = df_joined.join(
    df_characters,
    df_joined.my_character_id == df_characters.character_id,
    "left"
).select(
    "results.*", col("characters.character_name").alias("my_character_name")
)

# 相手キャラクターの名前を結合
df_joined = df_joined.join(
    df_characters,
    df_joined.opponent_character_id == df_characters.character_id,
    "left"
).select(
    "results.*", "my_character_name", col("characters.character_name").alias("opponent_character_name")
)

# マッチアップIDを生成
df_joined = df_joined.withColumn(
    "matchup_id",
    concat(col("my_character_id"), lit("-"), col("opponent_character_id"))
)

# 集計処理
df_stats = df_joined.groupBy("matchup_id", "my_character_name", "opponent_character_name").agg(
    count("*").alias("matches_played"),
    sum(when(col("win_or_loss") == "win", 1).otherwise(0)).alias("wins"),
    sum(when(col("win_or_loss") == "loss", 1).otherwise(0)).alias("losses")
)

# 勝率を計算
df_stats = df_stats.withColumn("win_rate", col("wins") / col("matches_played"))

# 結果をS3にCSV形式で保存
output_path_matchup = "s3://your-bucket-name/datamart/CharacterMatchupPerformance/"
df_stats.write.mode("overwrite").option("header", "true").csv(output_path_matchup)
###############WeatherImpactOnWinRatesテーブルの作成##################
# 'date_hour' カラムを追加('yyyy/MM/dd HH'形式)
df_game_results = df_game_results.withColumn("date_hour", date_format("match_time", "yyyy/MM/dd HH"))
df_weather = df_weather.withColumn("date_hour", date_format("datetime", "yyyy/MM/dd HH"))

# 'time_slot' カラムを生成
df_game_results = df_game_results.withColumn(
    "time_slot",
    when((hour("match_time") >= 4) & (hour("match_time") <= 11), "Morning")
    .when((hour("match_time") >= 12) & (hour("match_time") <= 18), "Afternoon")
    .otherwise("Evening")
)

# エイリアスを使用して結合
df_joined_weather = df_game_results.alias("game").join(
    df_weather.alias("weather"), 
    col("game.date_hour") == col("weather.date_hour"),
    "left"
)

# 結合後のデータから必要な情報を選択、降水量がnullの場合は0を使用
df_stats_weather = df_joined_weather.select(
    col("game.date_hour").alias("date"),
    col("game.time_slot").alias("time_slot"),
    col("weather.temperature").alias("temperature"),
    coalesce(col("weather.precipitation"), lit(0)).alias("precipitation"),  # Nullの場合は0を使用
    col("game.win_or_loss").alias("win_or_loss")
)

# 勝率の計算と集計
df_stats_weather = df_stats_weather.withColumn(
    "wins", when(col("win_or_loss") == "win", 1).otherwise(0)
).withColumn(
    "losses", when(col("win_or_loss") == "loss", 1).otherwise(0)
)

df_final_stats = df_stats_weather.groupBy("date", "time_slot").agg(
    first("temperature").alias("temperature"),
    first("precipitation").alias("precipitation"),
    count("date").alias("matches_played"),
    sum("wins").alias("wins"),
    sum("losses").alias("losses"),
    (sum("wins") / count("date")).alias("win_rate")
)

# 結果をS3にCSV形式で保存
output_path_weather = "s3://your-bucket-name/datamart/WeatherImpactOnWinRates"
df_final_stats.write.mode("overwrite").option("header", "true").csv(output_path_weather)

#######################################################################

sc.stop()

基本的にデータフレームワークを使っているので、Pandasを使ったことがある方であれば、すぐにイメージがつくコードかもしれません!

以上でGlue × PySparkでETLパイプラインを構築することができました。


悩んだところ

Glue ETLジョブなのですが、IAMの権限を設定しても権限エラーが消えませんでした。
これは、サービス単位で権限が付与されていないことが原因だったため、もしIAMの設定が問題ないのに権限エラーが出る場合は、AWSサポートに問い合わせてみてください!


🚀 次回

次回(第6回)は、Step Functions + Lambdaで、AWSの処理を自動化していきます!
https://qiita.com/shota1212/items/6a6fb8f9addf6eb152a0


0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?