前回Pythonを用いたETL処理を行ってみた。
(学習メモ)Footballデータを使用したPythonでのETL処理及び分析
今回は、AWSを使用してデータパイプラインの構築を行ってみたい。
今回の学習内容
・データレイクとしてS3を用い、データを配置
・AWS Glueを用いてETL処理
・Amazon RedShiftでデータマートを作成
1. S3 に元データをアップロード
1.1 S3バケットの作成
項目 | 設定内容 |
---|---|
パケット名 | 任意のバケット名 |
リージョン | ap-northeast-1(東京) |
パブリックアクセスをブロック | すべてONのまま |
暗号化 | デフォルトのS3管理キー |
1.2 データのアップロード
今回はすでにcsvファイルをローカルに用意してあるので、
「アップロード」から手動でデータをアップロード。
2.GlueでETL 処理
2.1 Glueジョブを作成
GlueジョブはサーバレスでETL処理を実装できる機能。
左メニューのETL jobsから、jobを作成。
Visual ETL、Notebook、Script editorの3つあるが、今回はScript editorで作成。
Python、Ray、Sparkが選択でき、今回はPythonを使用する。
スクリプトが記述できる画面となる。
2.2 job detailsタブからjobの設定を行う。
今回は下記の設定で進める。
項目 | 設定内容 |
---|---|
名前 | Footballdata-ETL |
IAMロール | ETL-test-role |
IAMロールは事前にETL-test-role(任意の名前)で作成済み。
ETL-test-roleでは下記のポリシーを事前に作成済み。
・AmazonS3FullAcceess
・AWSGlueServiseRole
2.3 Glueジョブを実行
#各種ライブラリをインポート
import numpy as np
import pandas as pd
import boto3
#データウェアハウスへのデータ
dest_s3_file = "s3://sho-etlpractice-datawarehouse/footballdata/footballdata-etl.csv"
df_game_2022 = "s3://sho-etlpractice-demo/footballdata/j1_matches_2022.csv"
df_game_2023 = "s3://sho-etlpractice-demo/footballdata/j1_matches_2023.csv"
df_game_2024 = "s3://sho-etlpractice-demo/footballdata/j1_matches_2024.csv"
#ETL処理
df_game_2022 = pd.read_csv(df_game_2022)
df_game_2023 = pd.read_csv(df_game_2023)
df_game_2024 = pd.read_csv(df_game_2024)
#3年分を結合
df_game_all_season = pd.concat([df_game_2022, df_game_2023, df_game_2024], ignore_index=True)
# 日付をdatetime型に変更
df_game_all_season["date_GMT"] = pd.to_datetime(df_game_all_season["date_GMT"], format="%b %d %Y - %I:%M%p")
# そこから年を抜き出し、新たな列に格納する
df_game_all_season["year"] = df_game_all_season["date_GMT"].dt.year
#ホームチーム得点とアウェイチーム得点の値を結合し「スコア」として新たな列に入れる
df_game_all_season["score"] = df_game_all_season["home_team_goal_count"].astype(str) + "-" + df_game_all_season["away_team_goal_count"].astype(str)
df_game_all_season[["home_team_goal_count","away_team_goal_count","score"]].head()
# Game Weekの列名を修正
df_game_all_season = df_game_all_season.rename(columns={'Game Week': 'game_week'})
# チーム名を修正
df = pd.DataFrame({
'team_name': ['Tokyo','Urawa Reds','Yokohama']
})
mapping = {
'Tokyo': 'FC Tokyo',
'Urawa Reds':'Urawa Red Diamonds',
'Yokohama':'Yokohama FC'
}
# replace()を使って複数の置換を一括実施
df_game_all_season['home_team_name'] = df_game_all_season['home_team_name'].replace(mapping)
df_game_all_season['away_team_name'] = df_game_all_season['away_team_name'].replace(mapping)
#必要なカラムだけにする
df_game_all_season = df_game_all_season[[
'year', # 年
'home_team_name', # ホームチーム
'away_team_name', # アウェイチーム
'game_week', # 節
'home_team_goal_count', # ホームチーム得点
'away_team_goal_count', # アウェイチーム得点
'score', # スコア
'home_team_corner_count', # ホームチーム コーナーキック数
'away_team_corner_count', # アウェイチーム コーナーキック数
'home_team_shots', # ホームチーム シュート数
'away_team_shots', # アウェイチーム シュート数
'home_team_shots_on_target', # ホームチーム枠内シュート数
'away_team_shots_on_target', # アウェイチーム枠内シュート数
'home_team_fouls', # ホームチーム ファウル数
'away_team_fouls', # アウェイチーム ファウル数
'home_team_possession', # ホームチーム ポゼッション率
'away_team_possession' # アウェイチーム ポゼッション率
]]
#結果をS3にロード
df_game_all_season.to_csv(dest_s3_file, index=False)
2.4 実行の確認
Runsタブで状況を確認できる。
Falledの場合、エラー文が表示されるが、Error logsからCloudWatchに遷移し、エラーの詳細が確認できる。
エラーと対応内容
下記のようなGlue ETLのジョブ実行エラーが発生。
PermissionError: Anonymous users cannot invoke this API. Please authenticate.
一見すると、S3へのアクセス権限のエラーのようだったので、以下を実施して調査。
まず、コードの中で、S3からのファイル読み込み処理は残し、S3へのロード処理をコメントアウトしジョブ実行したところ、ジョブは成功。つまり、S3じたとは正常にやり取りができる状態である。そのため、ロード先のフォルダに何か問題があると推察。
原因:
対象のフォルダを確認したところ単純にETLコード内で指定したフォルダ名とS3のフォルダ名が一致していなかった。S3のフォルダ名がスペルミスだったので、そちらを修正。
3. Redshift でデータをロード
事前にロールの作成と下記ポリシーを割り当て済み。
・AmazonRedshiftFullAccess
。RedshiftS3AccessPolicy
3.1 Redshift クラスターを作成
項目 | 設定内容 |
---|---|
名前 | sho-footballdata-cluster(任意の名前) |
ノードタイプ | dc2.large |
ノード数 | 1 |
クラスターの選択について
今回はミニマムサイズのdc2.largeを選択。
クラスターについては、ストレージ+コンピュートで料金が発生するが、
dc2.largeストレージの料金かからない模様。
そのため、停止しておけば料金はかからない。
3.2 テーブル作成
まずはテーブルを作成。
以下の CREATE TABLE を実行。
Glueで作成したデータ構造に合わせたものとする。
DROP TABLE IF EXISTS game_all_season;
CREATE TABLE game_all_season (
year INT, -- 年
home_team_name VARCHAR(100), -- ホームチーム
away_team_name VARCHAR(100), -- アウェイチーム
game_week INT, -- 節
home_team_goal_count INT, -- ホームチーム得点
away_team_goal_count INT, -- アウェイチーム得点
score VARCHAR(10), -- スコア
home_team_corner_count INT, -- ホームチーム コーナーキック数
away_team_corner_count INT, -- アウェイチーム コーナーキック数
home_team_shots INT, -- ホームチーム シュート数
away_team_shots INT, -- アウェイチーム シュート数
home_team_shots_on_target INT, -- ホームチーム枠内シュート数
away_team_shots_on_target INT, -- アウェイチーム枠内シュート数
home_team_fouls INT, -- ホームチーム ファウル数
away_team_fouls INT, -- アウェイチーム ファウル数
home_team_possession DECIMAL(5,2), -- ホームチーム ポゼッション率(例: 55.2)
away_team_possession DECIMAL(5,2)
);
3.3 S3からデータをロード
AWS Glueでの処理を経てS3にデータを保存済みなので、S3からRedshiftにロードをしてみる。
・COPY コマンドで S3 からロード
・IAM_ROLEは対象のロールのarnをそのままコピーしてくる。
COPY game_all_season
FROM 's3://sho-etlpractice-processed/footballdata/'
IAM_ROLE 'arn:aws:iam::123456789012:role/ロール名'
FORMAT AS CSV
IGNOREHEADER 1;
COPY時のエラー対応
エラーその1:
S3ServiceExceptionエラーが発生。
no identity-based policy allows the s3
原因:
RedshiftCustomRole に S3 への適切なアクセス権限がない。
対応:
ロールにインラインポリシーを作成
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": "arn:aws:s3:::S3フォルダ名
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject"
],
"Resource": "arn:aws:s3:::S3フォルダ名/*"
}
]
}
備考:インラインポリシーでは1つのIAMロールのみに適用される。
エラーその2:
ERROR: Load into table 'game_all_season' failed. Check 'stl_load_errors' system table for details.
下記でエラーを確認
SELECT *
FROM stl_load_errors
ORDER BY starttime DESC
LIMIT 10;
エラー結果
Invalid digit, Value 'F', Pos 0, Type: Integer
原因:
ファイルの中身を直接確認してみたところ、1列目に意図せずインデックスが入っていた。
そのため、各列が1つずつズレており、型が一致しないというエラーが発生していた。
対応:
GlueのETL処理でS3へのロード時の処理をインデックスを含めない形に変更。
df_game_all_season.to_csv(dest_s3_file, index=False)
3.3 Redshift でデータマートを作成
最後にデータマートを作成してみる。
作成方法は通常のテーブル作成と同様。
内容としては、各チーム毎n全試合のスコアをはじめとする試合詳細を集計。
DROP TABLE IF EXISTS game_summary_mart;
CREATE TABLE game_summary_mart AS
SELECT
year,
game_week,
home_team_name,
score,
home_team_possession AS possession,
CASE
WHEN home_team_goal_count > away_team_goal_count THEN '勝利'
WHEN home_team_goal_count < away_team_goal_count THEN '負け'
ELSE '引き分け'
END AS result,
CASE
WHEN home_team_goal_count > away_team_goal_count THEN 3
WHEN home_team_goal_count < away_team_goal_count THEN 0
ELSE 1
END AS points,
'ホーム' AS home_or_away
FROM game_all_season
UNION ALL
SELECT
year,
game_week,
away_team_name,
score,
away_team_possession AS possession,
CASE
WHEN away_team_goal_count > home_team_goal_count THEN '勝利'
WHEN away_team_goal_count < home_team_goal_count THEN '負け'
ELSE '引き分け'
END AS result,
CASE
WHEN away_team_goal_count > home_team_goal_count THEN 3
WHEN away_team_goal_count < home_team_goal_count THEN 0
ELSE 1
END AS points,
'アウェイ' AS home_or_away
FROM game_all_season
ORDER BY year,game_week,home_or_away DESC;
データの確認
SELECT *
FROM game_summary_mart
結果
念の為、件数を確認しておく。
SELECT count(*)
FROM game_summary_mart
結果
1984行
2022年:18チーム ✖️ 34試合
2023年:18チーム ✖️ 34試合
2024年:20チーム ✖️ 38試合
合計:1984
抜け漏れ、重複なくデータマートが作成できたことを確認できた。
以上