🎮 はじめに
これまで、
- スマブラ対戦データ
- 気象データ
をローカルから収集し、AWS S3に保存してきました。
今回はいよいよ、AWS GlueとPySparkを使って、
これらのデータを結合・加工するETLパイプラインを構築していきます!
これにより、最終的に
- キャラクターごとの勝率
- キャラ間の相性
- 天候や時間帯ごとの勝率
を分析できるデータが完成します!
🛠 使用技術
- AWS Glue
- PySpark(Glueジョブ内で使用)
- AWS S3(データレイク・データマート)
- AWS IAM(Glue用ロール設定)
🧱 ETLパイプライン全体像
今まで、上記図のローカル環境の部分の開発を進めてきました。
今回は赤線で囲った、データレイク → データウェアハウス → データマート間のETL処理を作っていきます。
データの流れとしては、Crawlerを使ってS3のデータを収集しData Catalogに登録、Data Catalogのデータを元にETLジョブ(pyspark)でデータを加工し、後続のS3に保存します。
この流れを図のように繰り返します。
🛠 実装
Crawler、Data Catalog、ETLジョブ(pyspark)の解説を進めていきます。
①Crawlerの作成
Crawler(クローラー)は、S3のデータソースをスキャンして、スキーマを推測し、Glue Data Catalogにテーブルを作成してくれるサービスです。
右上のCreate crawlerから作成することができます!
②Data Catalog
Glue Data Catalogは、AWSにおける「データのメタデータ(データの説明書き)」を管理するサービスです。
ざっくりいうと、
- どこに(S3、RDSなど)
- どんな形式で(CSV、Parquet、JSONなど)
- どんなカラムが(カラム名・型) 保存されているか?を登録・管理するための台帳・辞書です。
テーブルが増えてスキーマが増えてくると、どのスキーマが何を表しているか分からなくなってきます。
そういった悩みを解決してくれるのがデータカタログです!
③【ETLジョブ】データレイク → データウェアハウス の処理
次にデータレイク → データウェアハウス間の処理を書いていきます。
ここでは、上記図赤線の日付を共通の形式(yyyy/mm/dd hh:mm)に変換していきます。
実際のコードは以下になります。
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import udf, col, date_format
from pyspark.sql.types import StringType
from datetime import datetime, timedelta
from awsglue.dynamicframe import DynamicFrame
import sys
# UDF (User Defined Function) の定義
def adjust_datetime(date, time):
if time == 24:
new_time = "00:00"
new_date = datetime.strptime(date, "%Y-%m-%d") + timedelta(days=1)
new_date = new_date.strftime("%Y-%m-%d")
else:
new_time = f"{int(time):02}:00" # 時間を2桁の形式に固定し、分を追加
new_date = date
return f"{new_date} {new_time}"
# UDFの登録
adjust_datetime_udf = udf(adjust_datetime, StringType())
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
# データカタログからデータフレームを読み込む(ここは実際にデータカタログにある名前を指定)
dyf_django = glueContext.create_dynamic_frame.from_catalog(database="smash_datalake", table_name="smash_datalakedjango_data_export_csv")
dyf_weather = glueContext.create_dynamic_frame.from_catalog(database="smash_datalake", table_name="smash_datalakepreterite_weather_export_csv")
df_django = dyf_django.toDF()
df_weather = dyf_weather.toDF()
# Djangoデータの変換: match_timeのフォーマット変更
df_django_transformed = df_django.withColumn("match_time", date_format("match_time", "yyyy-MM-dd HH:mm"))
# Weatherデータの変換: dateとtimeの結合とフォーマット変更、元のdateとtimeカラムを削除
df_weather_transformed = df_weather.withColumn("datetime", adjust_datetime_udf(col("date"), col("time"))) \
.drop("date") \
.drop("time")
# datetimeカラムを最前列に移動
column_order = ["datetime"] + [col_name for col_name in df_weather_transformed.columns if col_name != "datetime"]
df_weather_transformed = df_weather_transformed.select(*column_order)
# S3(データウェアハウス)に保存
df_django_transformed.write.mode('overwrite').option("header", "true").csv("s3://your-bucket-name/datawarehouse/GameResults/")
df_weather_transformed.write.mode('overwrite').option("header", "true").csv("s3://your-bucket-name/datawarehouse/Weather/")
sc.stop()
全体の流れとしては、
1. データカタログのデータを、データフレームに読み込み
2. フォーマットを変更(Weatherデータは日付と時間が分かれているのでマージする)
3. S3(データウェアハウス)に保存
となります!
④【ETLジョブ】データウェアハウス → データマート の処理
次にデータレイク → データウェアハウス間の処理を書いていきます。
ここでは、以下の3つのデータマートテーブルを作成しています。
- CharacterPerformance(個々のキャラクターのパフォーマンス)
- CharacterMatchupPerformance(二つのキャラクター間のマッチアップのパフォーマンス)
- WeatherImpactOnWinRates(気象情報と勝率の関係を確認)
実際のコードは以下になります。
import sys
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import col, count, sum, when, concat, lit, avg, date_format, hour, coalesce, first
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# データカタログからテーブルを読み込む
dyf_game_results = glueContext.create_dynamic_frame.from_catalog(
database="smash_data_warehouse",
table_name="gameresults"
)
dyf_character_master = glueContext.create_dynamic_frame.from_catalog(
database="smash_data_warehouse",
table_name="charactermaster"
)
dyf_weather = glueContext.create_dynamic_frame.from_catalog(
database="smash_data_warehouse",
table_name="weather"
)
# DataFrameに変換
df_game_results = dyf_game_results.toDF()
df_character_master = dyf_character_master.toDF()
df_weather = dyf_weather.toDF()
###############CharacterPerformanceテーブルの作成###############
df_performance = df_game_results.join(df_character_master, df_game_results.my_character_id == df_character_master.character_id)
df_performance = df_performance.groupBy("character_id", "character_name").agg(
count("*").alias("matches_played"),
sum(when(col("win_or_loss") == "win", 1).otherwise(0)).alias("wins"),
sum(when(col("win_or_loss") == "loss", 1).otherwise(0)).alias("losses")
)
df_performance = df_performance.withColumn("win_rate", col("wins") / col("matches_played"))
df_performance = df_performance.select(
"character_id",
"character_name",
"matches_played",
"wins",
"losses",
"win_rate"
)
output_path = "s3://your-bucket-name/datamart/CharacterPerformance"
df_performance.write.mode("overwrite").option("header", "true").csv(output_path)
###############CharacterMatchupPerformanceテーブルの作成###############
# DataFrameを使ったジョイン
df_joined = df_game_results.alias("results")
df_characters = df_character_master.alias("characters")
# 自キャラクターの名前を結合
df_joined = df_joined.join(
df_characters,
df_joined.my_character_id == df_characters.character_id,
"left"
).select(
"results.*", col("characters.character_name").alias("my_character_name")
)
# 相手キャラクターの名前を結合
df_joined = df_joined.join(
df_characters,
df_joined.opponent_character_id == df_characters.character_id,
"left"
).select(
"results.*", "my_character_name", col("characters.character_name").alias("opponent_character_name")
)
# マッチアップIDを生成
df_joined = df_joined.withColumn(
"matchup_id",
concat(col("my_character_id"), lit("-"), col("opponent_character_id"))
)
# 集計処理
df_stats = df_joined.groupBy("matchup_id", "my_character_name", "opponent_character_name").agg(
count("*").alias("matches_played"),
sum(when(col("win_or_loss") == "win", 1).otherwise(0)).alias("wins"),
sum(when(col("win_or_loss") == "loss", 1).otherwise(0)).alias("losses")
)
# 勝率を計算
df_stats = df_stats.withColumn("win_rate", col("wins") / col("matches_played"))
# 結果をS3にCSV形式で保存
output_path_matchup = "s3://your-bucket-name/datamart/CharacterMatchupPerformance/"
df_stats.write.mode("overwrite").option("header", "true").csv(output_path_matchup)
###############WeatherImpactOnWinRatesテーブルの作成##################
# 'date_hour' カラムを追加('yyyy/MM/dd HH'形式)
df_game_results = df_game_results.withColumn("date_hour", date_format("match_time", "yyyy/MM/dd HH"))
df_weather = df_weather.withColumn("date_hour", date_format("datetime", "yyyy/MM/dd HH"))
# 'time_slot' カラムを生成
df_game_results = df_game_results.withColumn(
"time_slot",
when((hour("match_time") >= 4) & (hour("match_time") <= 11), "Morning")
.when((hour("match_time") >= 12) & (hour("match_time") <= 18), "Afternoon")
.otherwise("Evening")
)
# エイリアスを使用して結合
df_joined_weather = df_game_results.alias("game").join(
df_weather.alias("weather"),
col("game.date_hour") == col("weather.date_hour"),
"left"
)
# 結合後のデータから必要な情報を選択、降水量がnullの場合は0を使用
df_stats_weather = df_joined_weather.select(
col("game.date_hour").alias("date"),
col("game.time_slot").alias("time_slot"),
col("weather.temperature").alias("temperature"),
coalesce(col("weather.precipitation"), lit(0)).alias("precipitation"), # Nullの場合は0を使用
col("game.win_or_loss").alias("win_or_loss")
)
# 勝率の計算と集計
df_stats_weather = df_stats_weather.withColumn(
"wins", when(col("win_or_loss") == "win", 1).otherwise(0)
).withColumn(
"losses", when(col("win_or_loss") == "loss", 1).otherwise(0)
)
df_final_stats = df_stats_weather.groupBy("date", "time_slot").agg(
first("temperature").alias("temperature"),
first("precipitation").alias("precipitation"),
count("date").alias("matches_played"),
sum("wins").alias("wins"),
sum("losses").alias("losses"),
(sum("wins") / count("date")).alias("win_rate")
)
# 結果をS3にCSV形式で保存
output_path_weather = "s3://your-bucket-name/datamart/WeatherImpactOnWinRates"
df_final_stats.write.mode("overwrite").option("header", "true").csv(output_path_weather)
#######################################################################
sc.stop()
基本的にデータフレームワークを使っているので、Pandasを使ったことがある方であれば、すぐにイメージがつくコードかもしれません!
以上でGlue × PySparkでETLパイプラインを構築することができました。
悩んだところ
Glue ETLジョブなのですが、IAMの権限を設定しても権限エラーが消えませんでした。
これは、サービス単位で権限が付与されていないことが原因だったため、もしIAMの設定が問題ないのに権限エラーが出る場合は、AWSサポートに問い合わせてみてください!
🚀 次回
次回(第6回)は、Step Functions + Lambdaで、AWSの処理を自動化していきます!
https://qiita.com/shota1212/items/6a6fb8f9addf6eb152a0