概要
SageMakerのプロセッシングやトレーニングを実行した際の成果物や利用ファイルなどの出力先のS3の変更メモ。
- sagemaker==2.191.0
結論
Session
オブジェクトを生成し、デフォルトバケットなどを変更して、Estimator
やProcessor
などに渡してあげる。
追記
以下を見る限り設定ファイルでデフォルトを変えれそうな感じ。
https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk
経緯
ひとまずは、scikit-learnでやってみようと、ドキュメントを確認して進めていたが、変更できるものとできないものがあった。
- https://sagemaker.readthedocs.io/en/stable/overview.html#train-a-model-with-the-sagemaker-python-sdk
- https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html
- https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/using_sklearn.html
- https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-estimator
- https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#scikit-learn-processor
例えばProcessor
だとProcessingOutput
のdestination
で、成果物の出力先は変更できる。ただ、code
に指定する実行する処理が書かれたソースファイルは、出力先は指定できずデフォルトバケットになってしまう。
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
sklearn_processor = SKLearnProcessor(
framework_version="0.20.0",
role="[Your SageMaker-compatible IAM role]",
instance_type="ml.m5.xlarge",
instance_count=1,
)
sklearn_processor.run(
code="preprocessing.py",
inputs=[
ProcessingInput(
source="s3://your-bucket/path/to/your/data",
destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(
output_name="train_data",
source="/opt/ml/processing/train",
destination="s3://your-bucket/path/to/your/data/processed/train"),
ProcessingOutput(
output_name="test_data",
source="/opt/ml/processing/test",
destination="s3://your-bucket/path/to/your/data/processed/test"),
],
)
ネットで検索してみるが、いまいち見つからない。PipelineVariableを使えばできるようなことも書かれていたが、上手くいかんし、そもそもまだパイプラインで実行していない。ネットで探し回るのが面倒になったので、もうソース見るかということで、SageMaker Python SDKのソースを読み始める。
以下の部分で、codeに指定されているソースファイルをS3にアップロードしていた。
# ScriptProcessor._normalize_argsの処理から呼び出される_upload_codeメソッドから抜粋
if _pipeline_config and _pipeline_config.code_hash:
desired_s3_uri = s3.s3_path_join(
"s3://",
self.sagemaker_session.default_bucket(),
self.sagemaker_session.default_bucket_prefix,
_pipeline_config.pipeline_name,
self._CODE_CONTAINER_INPUT_NAME,
_pipeline_config.code_hash,
)
else:
desired_s3_uri = s3.s3_path_join(
"s3://",
self.sagemaker_session.default_bucket(),
self.sagemaker_session.default_bucket_prefix,
self._current_job_name,
"input",
self._CODE_CONTAINER_INPUT_NAME,
)
return s3.S3Uploader.upload(
local_path=code,
desired_s3_uri=desired_s3_uri,
kms_key=kms_key,
sagemaker_session=self.sagemaker_session,
ということで、sagemaker_sessionのデフォルトバケットを変えるしかないということにたどり着く。
どうやって変えればいいのかと、ひとまずドキュメントを見てみる。
https://sagemaker.readthedocs.io/en/stable/api/utility/session.html
インスタンス生成時のdefault_bucketなどで指定できるようなのとローカル実行も対応したいので、Session、LocalSessionに対応し、最終的には以下のような形になった。
def generate_sagemaker_session(instance_type: str):
s3_bucket = get_s3_bucket()
s3_prefix = generate_s3_bucket_prefix_for_job()
if instance_type in ("local", "local_gpu"):
return LocalSession(default_bucket=s3_bucket, default_bucket_prefix=s3_prefix)
else:
return Session(default_bucket=s3_bucket, default_bucket_prefix=s3_prefix)
instance_type = "ml.m5.xlarge"
sklearn_processor = SKLearnProcessor(
framework_version="0.20.0",
role="[Your SageMaker-compatible IAM role]",
instance_type=instance_type,
instance_count=1,
sagemaker_session=generate_sagemaker_session(instance_type),
)
sklearn_processor.run(
code="preprocessing.py",
inputs=[
ProcessingInput(
source="s3://your-bucket/path/to/your/data",
destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(
output_name="train_data",
source="/opt/ml/processing/train"),
ProcessingOutput(
output_name="test_data",
source="/opt/ml/processing/test"),
],
)
これで、今のところ全てのファイルを想定しているS3バケットに出力することができた。
トレーニングも同じ方法で変更できた。
instance_type = "ml.m5.xlarge"
estimator = SKLearn(
entry_point='./training.py',
py_version='py3',
framework_version='0.23-1',
instance_count=1,
instance_type=instance_type,
role=get_role(),
sagemaker_session=generate_sagemaker_session(instance_type),
)
estimator.fit()