0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

(学習メモ)AWSでのデータパイプライン構築

Posted at

前回Pythonを用いたETL処理を行ってみた。
(学習メモ)Footballデータを使用したPythonでのETL処理及び分析

今回は、AWSを使用してデータパイプラインの構築を行ってみたい。

今回の学習内容

・データレイクとしてS3を用い、データを配置
・AWS Glueを用いてETL処理
・Amazon RedShiftでデータマートを作成

1. S3 に元データをアップロード

1.1 S3バケットの作成

項目 設定内容
パケット名 任意のバケット名
リージョン ap-northeast-1(東京)
パブリックアクセスをブロック すべてONのまま
暗号化 デフォルトのS3管理キー

1.2 データのアップロード

今回はすでにcsvファイルをローカルに用意してあるので、
「アップロード」から手動でデータをアップロード。

スクリーンショット 2025-03-16 22.27.30.png

2.GlueでETL 処理

スクリーンショット 2025-03-16 22.29.31.png

2.1 Glueジョブを作成

GlueジョブはサーバレスでETL処理を実装できる機能。

左メニューのETL jobsから、jobを作成。
Visual ETL、Notebook、Script editorの3つあるが、今回はScript editorで作成。

スクリーンショット 2025-03-16 22.30.53.png

Python、Ray、Sparkが選択でき、今回はPythonを使用する。
スクリーンショット 2025-03-16 22.34.01.png

スクリプトが記述できる画面となる。

スクリーンショット 2025-03-16 22.35.33.png

2.2 job detailsタブからjobの設定を行う。

スクリーンショット 2025-03-16 22.37.34.png

今回は下記の設定で進める。

項目 設定内容
名前 Footballdata-ETL
IAMロール ETL-test-role

IAMロールは事前にETL-test-role(任意の名前)で作成済み。
ETL-test-roleでは下記のポリシーを事前に作成済み。
・AmazonS3FullAcceess
・AWSGlueServiseRole

2.3 Glueジョブを実行

#各種ライブラリをインポート
import numpy as np
import pandas as pd
import boto3

#データウェアハウスへのデータ
dest_s3_file = "s3://sho-etlpractice-datawarehouse/footballdata/footballdata-etl.csv"

df_game_2022 = "s3://sho-etlpractice-demo/footballdata/j1_matches_2022.csv"
df_game_2023 = "s3://sho-etlpractice-demo/footballdata/j1_matches_2023.csv"
df_game_2024 = "s3://sho-etlpractice-demo/footballdata/j1_matches_2024.csv"

#ETL処理

df_game_2022 = pd.read_csv(df_game_2022)
df_game_2023 = pd.read_csv(df_game_2023)
df_game_2024 = pd.read_csv(df_game_2024)

#3年分を結合
df_game_all_season = pd.concat([df_game_2022, df_game_2023, df_game_2024], ignore_index=True)

# 日付をdatetime型に変更
df_game_all_season["date_GMT"] = pd.to_datetime(df_game_all_season["date_GMT"], format="%b %d %Y - %I:%M%p")
# そこから年を抜き出し、新たな列に格納する
df_game_all_season["year"] = df_game_all_season["date_GMT"].dt.year


#ホームチーム得点とアウェイチーム得点の値を結合し「スコア」として新たな列に入れる
df_game_all_season["score"] = df_game_all_season["home_team_goal_count"].astype(str) + "-" + df_game_all_season["away_team_goal_count"].astype(str)
df_game_all_season[["home_team_goal_count","away_team_goal_count","score"]].head()


# Game Weekの列名を修正
df_game_all_season = df_game_all_season.rename(columns={'Game Week': 'game_week'})


# チーム名を修正
df = pd.DataFrame({
    'team_name': ['Tokyo','Urawa Reds','Yokohama']
})

mapping = {
    'Tokyo': 'FC Tokyo',
    'Urawa Reds':'Urawa Red Diamonds',
    'Yokohama':'Yokohama FC'
}

# replace()を使って複数の置換を一括実施
df_game_all_season['home_team_name'] = df_game_all_season['home_team_name'].replace(mapping)
df_game_all_season['away_team_name'] = df_game_all_season['away_team_name'].replace(mapping)


#必要なカラムだけにする
df_game_all_season = df_game_all_season[[
    'year',                      # 年
    'home_team_name',            # ホームチーム
    'away_team_name',            # アウェイチーム
    'game_week',                 # 節
    'home_team_goal_count',      # ホームチーム得点
    'away_team_goal_count',      # アウェイチーム得点
    'score',                     # スコア
    'home_team_corner_count',    # ホームチーム コーナーキック数
    'away_team_corner_count',    # アウェイチーム コーナーキック数
    'home_team_shots',           # ホームチーム シュート数
    'away_team_shots',           # アウェイチーム シュート数
    'home_team_shots_on_target', # ホームチーム枠内シュート数
    'away_team_shots_on_target', # アウェイチーム枠内シュート数
    'home_team_fouls',           # ホームチーム ファウル数
    'away_team_fouls',           # アウェイチーム ファウル数
    'home_team_possession',      # ホームチーム ポゼッション率
    'away_team_possession'       # アウェイチーム ポゼッション率
]]

#結果をS3にロード
df_game_all_season.to_csv(dest_s3_file, index=False)

2.4 実行の確認

Runsタブで状況を確認できる。

スクリーンショット 2025-03-16 22.53.44.png

Falledの場合、エラー文が表示されるが、Error logsからCloudWatchに遷移し、エラーの詳細が確認できる。

エラーと対応内容

下記のようなGlue ETLのジョブ実行エラーが発生。

PermissionError: Anonymous users cannot invoke this API. Please authenticate.

一見すると、S3へのアクセス権限のエラーのようだったので、以下を実施して調査。

まず、コードの中で、S3からのファイル読み込み処理は残し、S3へのロード処理をコメントアウトしジョブ実行したところ、ジョブは成功。つまり、S3じたとは正常にやり取りができる状態である。そのため、ロード先のフォルダに何か問題があると推察。

原因:
対象のフォルダを確認したところ単純にETLコード内で指定したフォルダ名とS3のフォルダ名が一致していなかった。S3のフォルダ名がスペルミスだったので、そちらを修正。

3. Redshift でデータをロード

スクリーンショット 2025-03-16 22.47.59.png

事前にロールの作成と下記ポリシーを割り当て済み。
・AmazonRedshiftFullAccess
。RedshiftS3AccessPolicy

3.1 Redshift クラスターを作成

スクリーンショット 2025-03-16 22.58.50.png

項目 設定内容
名前 sho-footballdata-cluster(任意の名前)
ノードタイプ dc2.large
ノード数 1

クラスターの選択について
今回はミニマムサイズのdc2.largeを選択。
クラスターについては、ストレージ+コンピュートで料金が発生するが、
dc2.largeストレージの料金かからない模様。
そのため、停止しておけば料金はかからない。

3.2 テーブル作成

まずはテーブルを作成。
以下の CREATE TABLE を実行。
Glueで作成したデータ構造に合わせたものとする。

DROP TABLE IF EXISTS game_all_season;
CREATE TABLE game_all_season (
    year INT,                          -- 年
    home_team_name VARCHAR(100),       -- ホームチーム
    away_team_name VARCHAR(100),       -- アウェイチーム
    game_week INT,                     -- 節
    home_team_goal_count INT,          -- ホームチーム得点
    away_team_goal_count INT,          -- アウェイチーム得点
    score VARCHAR(10),                 -- スコア
    home_team_corner_count INT,        -- ホームチーム コーナーキック数
    away_team_corner_count INT,        -- アウェイチーム コーナーキック数
    home_team_shots INT,               -- ホームチーム シュート数
    away_team_shots INT,               -- アウェイチーム シュート数
    home_team_shots_on_target INT,     -- ホームチーム枠内シュート数
    away_team_shots_on_target INT,     -- アウェイチーム枠内シュート数
    home_team_fouls INT,               -- ホームチーム ファウル数
    away_team_fouls INT,               -- アウェイチーム ファウル数
    home_team_possession DECIMAL(5,2), -- ホームチーム ポゼッション率(例: 55.2)
    away_team_possession DECIMAL(5,2)
);

3.3 S3からデータをロード

AWS Glueでの処理を経てS3にデータを保存済みなので、S3からRedshiftにロードをしてみる。

・COPY コマンドで S3 からロード
・IAM_ROLEは対象のロールのarnをそのままコピーしてくる。

COPY game_all_season
FROM 's3://sho-etlpractice-processed/footballdata/'
IAM_ROLE 'arn:aws:iam::123456789012:role/ロール名'
FORMAT AS CSV
IGNOREHEADER 1;

COPY時のエラー対応

エラーその1:
S3ServiceExceptionエラーが発生。

no identity-based policy allows the s3

原因:
RedshiftCustomRole に S3 への適切なアクセス権限がない。

対応:
ロールにインラインポリシーを作成

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::S3フォルダ名
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": "arn:aws:s3:::S3フォルダ名/*"
        }
    ]
}

備考:インラインポリシーでは1つのIAMロールのみに適用される。

エラーその2:

ERROR: Load into table 'game_all_season' failed. Check 'stl_load_errors' system table for details.

下記でエラーを確認

SELECT *
FROM stl_load_errors
ORDER BY starttime DESC
LIMIT 10;

エラー結果

Invalid digit, Value 'F', Pos 0, Type: Integer

原因:
ファイルの中身を直接確認してみたところ、1列目に意図せずインデックスが入っていた。
そのため、各列が1つずつズレており、型が一致しないというエラーが発生していた。

対応:
GlueのETL処理でS3へのロード時の処理をインデックスを含めない形に変更。

df_game_all_season.to_csv(dest_s3_file, index=False)

3.3 Redshift でデータマートを作成

最後にデータマートを作成してみる。
作成方法は通常のテーブル作成と同様。
内容としては、各チーム毎n全試合のスコアをはじめとする試合詳細を集計。

DROP TABLE IF EXISTS game_summary_mart;
CREATE TABLE game_summary_mart AS
SELECT 
   year,
   game_week,
   home_team_name, 
   score, 
   home_team_possession AS possession,
   CASE 
       WHEN home_team_goal_count > away_team_goal_count THEN '勝利'
       WHEN home_team_goal_count < away_team_goal_count THEN '負け'
       ELSE '引き分け'
   END AS result,
   CASE 
       WHEN home_team_goal_count > away_team_goal_count THEN 3
       WHEN home_team_goal_count < away_team_goal_count THEN 0
       ELSE 1
   END AS points,
   'ホーム' AS home_or_away
FROM game_all_season

UNION ALL

SELECT 
   year,
   game_week,
   away_team_name, 
   score, 
   away_team_possession AS possession,
   CASE 
       WHEN away_team_goal_count > home_team_goal_count THEN '勝利'
       WHEN away_team_goal_count < home_team_goal_count THEN '負け'
       ELSE '引き分け'
   END AS result,
   CASE 
       WHEN away_team_goal_count > home_team_goal_count THEN 3
       WHEN away_team_goal_count < home_team_goal_count THEN 0
       ELSE 1
   END AS points,
   'アウェイ' AS home_or_away
FROM game_all_season
ORDER BY year,game_week,home_or_away DESC;

データの確認

SELECT *
FROM game_summary_mart

結果

スクリーンショット 2025-03-17 10.32.28.png

念の為、件数を確認しておく。

SELECT count(*)
FROM game_summary_mart

結果

1984行

2022年:18チーム ✖️ 34試合
2023年:18チーム ✖️ 34試合
2024年:20チーム ✖️ 38試合
合計:1984

抜け漏れ、重複なくデータマートが作成できたことを確認できた。

以上

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?