##背景
仕事の中でAWSのGlueを使う機会があり、その際に指定したS3からデータを読み込み、Glueでデータを加工して別の指定したS3バケットにデータを出力することを行ったので、その流れと方法をメモがてら残しておこうと思います。
##S3(Input)->Glue->S3(Output)
###引数(パラメータ)を取得
# パラメータを取得
args = getResolvedOptions(sys.argv, [
'JOB_NAME', 'source_path', 'out_path'])
# 各パラメータを変数におく
source_path = args['source_path']
out_path = args['out_path']
今回はLambdaから2つの引数の'source_path', 'out_path'を受け取る.
###S3からデータを受け取り
# 指定したS3のバケット配下のファイルをすべて読み込む
DataSource0 = glueContext.create_dynamic_frame_from_options(
connection_type = "s3",
connection_options = {'paths': source_path, 'recurse': True},
format="json")
引数で受け取ったsource_pathを入れ、指定したバケットのパスの配下にあるファイルを全て読み込む
###別のS3バケットに出力する
# 指定したS3のバケットにファイルを出力する
df.write.options(header=True, codec="gzip")\
.mode('overwrite').csv(out_path)
指定したバケットのパスへcsvの形式でgzipに圧縮してから出力する
##Lambdaで引数をつけてGlueを起動する
from logging import getLogger, INFO
from datetime import datetime, timedelta, timezone
from app.libs.sns.sns_client import SnsClient
import boto3
import os
import json
logger = getLogger(__name__)
logger.setLevel(INFO)
# 起動するglueのジョブ名
# JOB_NAMEは環境変数
JOB_NAME = os.environ['JOB_NAME']
# エラー通知のSNSトピック
ERROR_PUBLISH_SNS_ARN = os.environ['ERROR_PUBLISH_SNS_ARN']
class TriggerJob:
def __init__(self):
self.sns = SnsClient()
def handler(self, event):
start_arg = {}
try:
# Glueのジョブを実行する
start_arg = {
'--out_path': "<指定のアウトプットするs3のパス>"
'--source_path': "<指定のインプットするs3のパス>"
}
glue = boto3.client('glue')
# glueJob起動
response = glue.start_job_run(
JobName=JOB_NAME,
AllocatedCapacity=2,
Arguments=start_arg)
logger.info("response:{}".format(response))
return response
except Exception as err:
logger.error("Glueジョブの起動でエラーが発生しました。err: {}".format(err))
##最後に
今回はlambdaでGlueを指定のS3から入力と出力をするコードを載せました。
実際は他の処理とかもあり、関係ない部分は削除しておりますが、不要なものがまだあるかもしれないですがご了承ください。
ご参考になれば幸いです。