はじめに
いろんな形式のデータを他システムに連携するため、glueを使用することになったので、経験して詰まったところを忘れないようにメモ
Glueとは?
- サーバーレスのETLサービス(詳細なことはGlueのマニュアルを見て理解して)
- 分析用(Athena)用のデータレイクの作成
実際にやってみた!
- Glueジョブの作成
- type: Spark
- Language: python3
- worker数: 10
- worker type: G1X
- 作成したGlueジョブを実行
実際に使用した関数の例
S3バケットのファイルを別のS3バケットに格納
- 連携元のS3バケットのファイルはクローリング済み、コネクション作成済み
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SQLContext
from awsglue.dynamicframe import DynamicFrame
from datetime import timedelta, timezone
# Job の初期化
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'WORKFLOW_NAME',
])
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
try:
# DynamicFrameの作成
datasource1 = glueContext.create_dynamic_frame.from_catalog(
database={コネクションDB名}, table_name={コネクションテーブル名}, transformation_ctx="datasource1")
# Dataframe変換
df = datasource1.toDF()
# カラムの型変換
df = df.withColumn("test", "string")
# 変換後のカラムを取得
df_schema1 = df.schema.fields
# parquetでS3バケットに書き込み
df.write.mode('overwrite').parquet({格納先のS3バケット})
except Exception as e:
# 処理落ちした行数取得
exception_traceback = sys.exc_info()[2]
err_line_number = exception_traceback.tb_lineno
関数の説明
- create_dynamic_frame.from_catalog
- コネクション経由でS3バケット内のファイルをダイナミックフレームとして取得
- toDF
- ダイナミックフレームをデータフレームに変換
- データフレームのカラム名を変換(引数に配列を設定し、一括で変換する用途で使用)
- withColumn
- データフレームのカラムの型を変更するときに使用
- 第一引数に型を変換したいカラム名、第二引数に変換したい型を指定
- schema.fields
- データフレームのカラムの名前と型を連想配列で取得
- mode('overwrite').parquet({格納先のS3バケット})
- 格納先のS3バケットにparquet形式でファイルを格納
- mode('overwrite')を指定することでファイルを上書きする
Glueでできること
- DBのテーブルデータを別のDBもしくはS3バケットにファイルとして連携
- JDBCコネクションもできるが、コネクションを利用して接続することも可能(詳しくはGlueのリファレンスを参照)
- S3バケット内のファイルを別S3バケットにCSVもしくはparquet形式でデータを連携
- aws-sdkを使用できるので、sdkを利用してできることは可能だが、複雑なロジックはお薦めしない
- 場合によってはジョブのスペックに限らず、処理が固まったり、不整合が発生し、それ用にカスタムロジックを組み込まなければいけないなど、大幅にコストがかかる可能性があるので、そこは注意
注意点
- 現状の構成でpandasを用いるとGlueジョブに高負荷をかけすぎるとJOB自体が固まって、エラーになることがある
- 基本的に連携元と連携先を1:1の関係で1つのジョブを作成すると思うが、S3バケットにパラメータファイルを定義しておけば1つのジョブで複数の連携先もしくは連携元にデータ連携可能にはなるが、その分汎用的なロジックを作成するため、リスクがあったりコストがかかる
- 連携したS3バケットのファイルのデータをクローリングする場合、Glueがデータの中身を見て、型を決めるので、場合によっては意図した型にならない場合があるので、その場合は手運用もしくは変換用のスクリプトを作成するなどの工夫が必要
- 連携先をDBにする場合、glueのリファレンスにある関数を利用すると、TRUNCATE、INSERTを行う。取り扱うデータによっては連携先のDBに高負荷がかかるので、メモリーオーバフローを起こさないようなスペックが必要になる。
- 格納先のS3バケットにファイルを格納するとき、ファイル名はこちらで指定できないので、もしファイル名が指定したい場合は格納後、SDKを利用してファイルのリネームを行うスクリプトや手運用でのリネームが必要
今後
S3バケットのファイルを取り扱ったものを記事としてまとめたので、今度はDBを取り扱ったものを記事として書こうと思う