0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Lambdaから指定したS3に入出力するGlueを起動させる話

Posted at

##背景
仕事の中でAWSのGlueを使う機会があり、その際に指定したS3からデータを読み込み、Glueでデータを加工して別の指定したS3バケットにデータを出力することを行ったので、その流れと方法をメモがてら残しておこうと思います。

##構築内容

##S3(Input)->Glue->S3(Output)
###引数(パラメータ)を取得

# パラメータを取得
args = getResolvedOptions(sys.argv, [
                          'JOB_NAME', 'source_path', 'out_path'])
# 各パラメータを変数におく
source_path = args['source_path']
out_path = args['out_path']

今回はLambdaから2つの引数の'source_path', 'out_path'を受け取る.

###S3からデータを受け取り

# 指定したS3のバケット配下のファイルをすべて読み込む
DataSource0 = glueContext.create_dynamic_frame_from_options(
    connection_type = "s3",
    connection_options = {'paths': source_path, 'recurse': True},
    format="json")

引数で受け取ったsource_pathを入れ、指定したバケットのパスの配下にあるファイルを全て読み込む

###別のS3バケットに出力する

# 指定したS3のバケットにファイルを出力する
df.write.options(header=True, codec="gzip")\
    .mode('overwrite').csv(out_path)

指定したバケットのパスへcsvの形式でgzipに圧縮してから出力する

##Lambdaで引数をつけてGlueを起動する

from logging import getLogger, INFO
from datetime import datetime, timedelta, timezone
from app.libs.sns.sns_client import SnsClient
import boto3
import os
import json


logger = getLogger(__name__)
logger.setLevel(INFO)

# 起動するglueのジョブ名
# JOB_NAMEは環境変数
JOB_NAME = os.environ['JOB_NAME']
# エラー通知のSNSトピック
ERROR_PUBLISH_SNS_ARN = os.environ['ERROR_PUBLISH_SNS_ARN']


class TriggerJob:
    def __init__(self):
        self.sns = SnsClient()

    def handler(self, event):
        start_arg = {}
        try:
           
            # Glueのジョブを実行する
            start_arg = {
                '--out_path': "<指定のアウトプットするs3のパス>"
                '--source_path': "<指定のインプットするs3のパス>"
            }

            glue = boto3.client('glue')
            # glueJob起動
            response = glue.start_job_run(
                JobName=JOB_NAME,
                AllocatedCapacity=2,
                Arguments=start_arg)
            logger.info("response:{}".format(response))
            return response
        except Exception as err:
            logger.error("Glueジョブの起動でエラーが発生しました。err: {}".format(err))

##最後に
今回はlambdaでGlueを指定のS3から入力と出力をするコードを載せました。
実際は他の処理とかもあり、関係ない部分は削除しておりますが、不要なものがまだあるかもしれないですがご了承ください。
ご参考になれば幸いです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?