0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

SageMaker Python SDKのプロセッシングなどの出力先バケットの変更

Last updated at Posted at 2023-10-11

概要

SageMakerのプロセッシングやトレーニングを実行した際の成果物や利用ファイルなどの出力先のS3の変更メモ。

  • sagemaker==2.191.0

結論

Sessionオブジェクトを生成し、デフォルトバケットなどを変更して、EstimatorProcessorなどに渡してあげる。

追記
以下を見る限り設定ファイルでデフォルトを変えれそうな感じ。
https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk

経緯

ひとまずは、scikit-learnでやってみようと、ドキュメントを確認して進めていたが、変更できるものとできないものがあった。

例えばProcessorだとProcessingOutputdestinationで、成果物の出力先は変更できる。ただ、codeに指定する実行する処理が書かれたソースファイルは、出力先は指定できずデフォルトバケットになってしまう。

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0",
    role="[Your SageMaker-compatible IAM role]",
    instance_type="ml.m5.xlarge",
    instance_count=1,
)

sklearn_processor.run(
    code="preprocessing.py",
    inputs=[
        ProcessingInput(
            source="s3://your-bucket/path/to/your/data",
            destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(
            output_name="train_data",
            source="/opt/ml/processing/train",
            destination="s3://your-bucket/path/to/your/data/processed/train"),
        ProcessingOutput(
            output_name="test_data",
            source="/opt/ml/processing/test",
            destination="s3://your-bucket/path/to/your/data/processed/test"),
    ],
)

ネットで検索してみるが、いまいち見つからない。PipelineVariableを使えばできるようなことも書かれていたが、上手くいかんし、そもそもまだパイプラインで実行していない。ネットで探し回るのが面倒になったので、もうソース見るかということで、SageMaker Python SDKのソースを読み始める。

以下の部分で、codeに指定されているソースファイルをS3にアップロードしていた。

# ScriptProcessor._normalize_argsの処理から呼び出される_upload_codeメソッドから抜粋

if _pipeline_config and _pipeline_config.code_hash:
    desired_s3_uri = s3.s3_path_join(
        "s3://",
        self.sagemaker_session.default_bucket(),
        self.sagemaker_session.default_bucket_prefix,
        _pipeline_config.pipeline_name,
        self._CODE_CONTAINER_INPUT_NAME,
        _pipeline_config.code_hash,
    )
else:
    desired_s3_uri = s3.s3_path_join(
        "s3://",
        self.sagemaker_session.default_bucket(),
        self.sagemaker_session.default_bucket_prefix,
        self._current_job_name,
        "input",
        self._CODE_CONTAINER_INPUT_NAME,
    )
return s3.S3Uploader.upload(
    local_path=code,
    desired_s3_uri=desired_s3_uri,
    kms_key=kms_key,
    sagemaker_session=self.sagemaker_session,

ということで、sagemaker_sessionのデフォルトバケットを変えるしかないということにたどり着く。

どうやって変えればいいのかと、ひとまずドキュメントを見てみる。
https://sagemaker.readthedocs.io/en/stable/api/utility/session.html

インスタンス生成時のdefault_bucketなどで指定できるようなのとローカル実行も対応したいので、Session、LocalSessionに対応し、最終的には以下のような形になった。

def generate_sagemaker_session(instance_type: str):
    s3_bucket = get_s3_bucket()
    s3_prefix = generate_s3_bucket_prefix_for_job()

    if instance_type in ("local", "local_gpu"):
        return LocalSession(default_bucket=s3_bucket, default_bucket_prefix=s3_prefix)
    else:
        return Session(default_bucket=s3_bucket, default_bucket_prefix=s3_prefix)

instance_type = "ml.m5.xlarge"

sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0",
    role="[Your SageMaker-compatible IAM role]",
    instance_type=instance_type,
    instance_count=1,
    sagemaker_session=generate_sagemaker_session(instance_type),
)

sklearn_processor.run(
    code="preprocessing.py",
    inputs=[
        ProcessingInput(
            source="s3://your-bucket/path/to/your/data",
            destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(
            output_name="train_data",
            source="/opt/ml/processing/train"),
        ProcessingOutput(
            output_name="test_data",
            source="/opt/ml/processing/test"),
    ],
)

これで、今のところ全てのファイルを想定しているS3バケットに出力することができた。
トレーニングも同じ方法で変更できた。

instance_type = "ml.m5.xlarge"
estimator = SKLearn(
    entry_point='./training.py',
    py_version='py3',
    framework_version='0.23-1',
    instance_count=1,
    instance_type=instance_type,
    role=get_role(),
    sagemaker_session=generate_sagemaker_session(instance_type),
)
estimator.fit()
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?