はじめに
AWS のサーバーレスデータ統合サービス AWS Glue を使ってみました
その中でも分かりやすそうな AWS Glue Studio で Example jobs を実行します
具体的には以下のようなことをします
- S3 上の CSV ファイル3つを結合する
- 特定の列だけ抽出する
- 各列を数値や論理型に変換する
- Parquet 形式で出力する
- 出力ファイルに対して Athena から SQL を実行する
Parquet はデータを効率的に保存・検索するためのファイル形式です
思ったよりすんなり行かなかったので記事に残しておきます
AWS Glue とは
データの ETL = Extract Transfer Load = 収集、変換、読込の一連処理を簡単に実装します
サーバーレスなので、インフラのことを気にする必要がありません
データのスキーマ(項目、型)も Glue が自動的に認識してくれます
AWS Glue Studio とは
AWS Glue の ETL をブラウザ上の GUI で組めます
IAM ロールの作成
ジョブを実行する IAM ロールを作成します
IAM ロールのページで「ロールを作成」をクリック
「信頼されたエンティティタイプ」に「AWS のサービス」を選択(デフォルト)
「ユースケース」に「Glue」を選択
「次へ」をクリック
許可ポリシーに以下のポリシーを選択し、「次へ」をクリック
- AWSGlueServiceRole
- AmazonS3FullAccess
今回は S3 バケット上の CSV ファイルを入力、 Parquet ファイルを出力とするため、 S3 へのアクセス権限を与えます
また、とりあえず FullAccess にしていますが、本来は必要最低限の権限にする必要があります
「ロールを作成」をクリック
S3 バケットの作成
出力先となる S3 バケットを作成します
すでにあるバケットを利用する場合は不要です
入力元は公開されているバケットを使用します
データベースの作成
AWS Glue では、扱うデータのメタデータ(構造の定義情報など)をデータカタログという機能で管理します
データカタログの機能の中で、メタデータの保存先となるのがデータベースです
ジョブを実行する前にデータベースを作成しておく必要があります
AWS Glue のページでサイドメニューから Data Catalog |> Databases を開きます
「Add database」をクリックします
「Name」に適当な名前を入力し、「Create database」をクリックします
ジョブの作成
AWS Glue のページでサイドメニューから ETL jobs |> Visual ETL をクリックします
AWS Glue Studio のページが開きます
Exsample jobs を開き、中の「Visual ETL job to join multiple sources」を選択して「Create example job」をクリックします
ジョブ定義の GUI が開きます
ジョブの設定
「Job details」のタブをクリックします
「Name」を適当な名前に変更します
「IAM Role」に作成しておいた IAM ロールを選択します
右上「Save」をクリックします
うまく行くと、「Run」ボタンが活性化して押せるようになります
でも、まだ押さないでください
出力先の指定
「Visual」のタブに戻ります
一番下にある「Catalog」のノードをクリックします
「S3 Target Location」に作成した S3 バケットを入力します
下の方にある「Database」で作成したデータベースを選択します
右上「Save」をクリックして保存しておきます
ジョブの確認
Data source
左上の「Plan assignment」をクリックすると、入力ファイルの設定が確認できます
下の「Data preview」でデータの内容(先頭200件)が確認できます
「S3 URL」の下、「View」ボタンをクリックするとS3上のファイルが確認できます
他の各入力ファイル(Data source)のCSVファイルは以下のような内容です
各 Data source のノードで「Output schema」を見ると、ジョブの実行前の時点で各列の定義ができていることが分かります
ちなみに、 S3 上のファイル以外に Kinesis や データベースなども入力にすることができます
Transform - Join
Join のノードを開くと、2つの入力ファイルを結合する定義を確認できます
ただし、「Data preview」は各入力ファイルの先頭200行同士を結合しているため、キーの一致するレコードが存在せず、空になってしまいます(仕様上の問題)
しかも、この状態になると、もう一段下の Join ノードで「plan_id」が選択できなくなっています
「Save」しないようにしましょう
Transform - SelectFields
「Select Fields」のノードをクリックすると、列の選択(いわゆる射影、SELECT句)定義が確認できます
ここまでで結合した入力ファイルから、指定した列だけを抜き出します
Transform - ChangeSchema
「Change Schema」のノードをクリックすると、各列の型変換定義が確認できます
ここまでは全列「string」でしたが、ここで「bigint」や「boolean」に変換されます
ちなみに、 Transform では他にも SQL による操作や欠損血処理などが可能です
Data target
改めて「Catalog」ノードを開くと、「Format」に「Parquet」を指定しており、 Parquet 形式に変換していることが分かります
Target としては各種データベースも指定可能です
スクリプトの確認
「Script」タブを開くと、 GUI で繋いだノードをスクリプトとして確認できます
ジョブの実行
いよいよ右上「Run」をクリックしてジョブを実行しましょう
ジョブの実行状況は「Runs」タブで確認できます
1分強で「Succeeded」になります
左メニュー Data Catalog |> Databases |> Tables をクリックします
テーブルの一覧に subscriber_plans テーブルが作成されています
テーブル名をクリックするとテーブル詳細が確認できます
出力先の S3 バケットを確認すると Parquet ファイルが作成されています
Athena からのデータ分析
Athena のページを開き、「クエリエディタを起動」をクリックします
「設定」タブを開き、「管理」ボタンをクリックします
「Location of query result」 に S3 バケットを指定します
データベースを指定すると、 Glue で作ったテーブルに対して SQL が実行できました
まとめ
AWS Glue を使うことで ETL ジョブが作れました
日次や月次のデータ変換バッチ処理を組むのには便利そう