0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

複数のGlueジョブで自作Pythonモジュールを使い回す方法

Posted at

AWS Glue でジョブを書いていると、「S3のオブジェクト一覧を取る処理」や「S3にオブジェクトをアップロードする処理」など、複数ジョブで同じ Python コードをコピペしたくなる場面がよくあります。

このような共通の処理をそれぞれの Glue Job に都度記述していると、

  • バグ修正のたびに全ジョブのスクリプトを直さないといけない
  • どのジョブにどのバージョンのコードが入っているか分からなくなる

といった運用のつらさが出てきます。

そこで、自作モジュールを zip にまとめて S3 に置き、複数の Glue ジョブから共通利用する方法を紹介します。

全体の流れ

やりたいことのイメージとしては以下の通りです。

  1. ローカル環境で共通モジュールを作成する
  2. 共通モジュールを zip 化して S3 にアップロードする
  3. Glue Job で共通モジュールを使用する設定をする

これで、1つの自作ライブラリを複数のジョブから共通利用できるようになります。

1. 共通モジュール (例: s3_utils) を作る

まずはローカルで、共通モジュール用のディレクトリを作ります。

% mkdir -p glue_lib/common
% cd glue_lib
% touch common/__init__.py
% touch common/s3_utils.py

今回の例では、「指定したプレフィックス以下のオブジェクトキーを一覧取得する関数」を用意してみます。

# common/s3_utils.py:

import boto3
from typing import List


def list_s3_keys(bucket: str, prefix: str) -> List[str]:
    """
    指定したバケット / プレフィックス配下のオブジェクトキー一覧を返す簡易ユーティリティ。

    :param bucket: S3 バケット名
    :param prefix: プレフィックス(例: 'logs/2025/01/01/'"""
    s3 = boto3.client("s3")
    keys: List[str] = []
    continuation_token = None

    while True:
        params = {
            "Bucket": bucket,
            "Prefix": prefix,
        }
        if continuation_token:
            params["ContinuationToken"] = continuation_token

        resp = s3.list_objects_v2(**params)
        for obj in resp.get("Contents", []):
            keys.append(obj["Key"])

        # 続きがなければ終了
        if not resp.get("IsTruncated"):
            break

        continuation_token = resp.get("NextContinuationToken")

    return keys

__init__.py は、「このフォルダは Python パッケージです」とPythonに知らせるためのファイルです。(中身は空で問題ありません。)

参考: The Python Tutorial 6. Modules

2. モジュールを zip 化して S3 にアップロードする

作成した common パッケージを zip に固めて、S3 にアップロードします。

# glue_lib ディレクトリの直下で実行
% zip -r common_utils.zip common
  adding: common/ (stored 0%)
  adding: common/__init__.py (stored 0%)
  adding: common/s3_utils.py (deflated 43%)

作成した common_utils.zip を S3 にアップロードします。

# glue_lib ディレクトリの直下で実行
% aws s3 cp common_utils.zip s3://your-bucket-name/glue_libs/common_utils.zip
upload: ./common_utils.zip to s3://your-bucket-name/glue_libs/common_utils.zip

バケット名やプレフィックスはお好みで指定してください。

3. Glue ジョブに zip ライブラリを読み込ませる設定

次に、ETL ジョブのマネジメントコンソールから、先ほどの zip をライブラリとして読み込む設定を行います。

  1. ETL ジョブの画面から、「Job details」タブをクリック
    Dagless_job_-_AWS_Glue_Studio.png

  2. 一番下の「Advanced properties」を開き、Libraries → Python library path に先ほどアップロードした zip の S3 パスを指定
    Dagless_job_-_AWS_Glue_Studio.png
    Dagless_job_-_AWS_Glue_Studio.png
    読み込みたいパッケージが複数ある場合は、s3://aaa/package_a.zip,s3://bbb/package_b.zip のように、カンマ区切りで S3 パスを記述してください。

動作確認

デモとして、オブジェクトの一覧をログに出すだけの2つのジョブ (Job A, Job B) を作成し、同じ list_s3_keys 関数を実際に使い回してみます。

Job A
import os
from common.s3_utils import list_s3_keys


def main():
    bucket = os.environ.get("TARGET_BUCKET", "your-bucket-name")
    prefix = os.environ.get("TARGET_PREFIX", "sample/input/")

    print(f"Job A: Listing objects in s3://{bucket}/{prefix}")
    keys = list_s3_keys(bucket, prefix)

    for k in keys:
        print(k)


if __name__ == "__main__":
    main()
Job B
import os
from common.s3_utils import list_s3_keys


def main():
    bucket = os.environ.get("TARGET_BUCKET", "your-bucket-name")
    prefix = os.environ.get("TARGET_PREFIX", "sample/input/")

    print(f"Job B: Listing objects in s3://{bucket}/{prefix}")
    keys = list_s3_keys(bucket, prefix)

    for k in keys:
        print(k)


if __name__ == "__main__":
    main()

それぞれのジョブを実行すると、共通の関数を実行してオブジェクトの一覧を取得していることが確認できました。

  • Job A のログ
    CloudWatch___ap-northeast-1.png
  • Job B のログ
    CloudWatch___ap-northeast-1.png

バージョンアップ時の運用イメージ

共通モジュールをこの形で運用しておくと、バージョンアップもシンプルにできます。

  1. ローカルの common/ 以下を修正、テストで動作確認
  2. zip -r common_utils.zip common で再 zip
  3. S3 上の common_utils.zip を更新
    更新の方法はいずれかの方法が考えられると思います。やりやすい方法を採用してください。
    1. S3 上の common_utils.zip を置き換え(同じ名称で上書きする。)
    2. バージョン付きパスでアップロード(例: common_utils_v2.zip にして、ジョブごとに切り替え)

これで、Glue ジョブ側のスクリプトは一切触らずに、共通処理だけを差し替えることができます。

まとめ

  • Python モジュールを zip 化して S3 に置くことで、Glue ジョブ間で共通コードを安全に共有できる
  • 「Python library path」を指定するだけで import が可能
  • バージョンアップは zip の差し替え or バージョニングで運用できる

Glue ジョブが増えるほど威力を発揮すると思います。お役に立ちましたら幸いです。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?