はじめに
機械学習をする際に、学習データに前処理をしたいことってありますよね。機械学習では大量のデータを使うことも多いと思いますが、データが多いと前処理するのにも時間がかかって大変です。この記事では、Amazon SageMaker を使って、大量のデータを複数のインスタンスに分散させて処理をすることでデータ処理時間を短縮する方法をご紹介します。
Amazon SageMaker Processing とは
Amazon SageMaker Processing は、任意のコンテナと任意のスクリプトを使ってデータ処理ジョブを実行するための Amazon SageMaker の機能です。SageMaker のたいていの機能は API で呼び出して使用できるのですが、SageMaker Processing も API から使用することができます。こちらのサンプルノートブック を使いながら SageMaker Processing でデータ並列処理をする方法をご紹介します。このサンプルノートブックは Amazon SageMaker ノートブックインスタンスでの実行を想定しています。
データ処理用スクリプトをコンテナイメージの中に入れてしまってそれを SageMaker Processing で実行することももちろん可能ですが、スクリプトだけ変えてデータ処理したい場合に既存のコンテナイメージを使いまわせるため、このサンプルノートブックの方法が個人的におすすめです。
データ並列処理
ここからは、サンプルノートブックのコードを確認していきます。
ノートブック実行環境
このサンプルノートブックは以下のサービスを使用します。ノートブックインスタンス作成時の手順の中で新しい IAM Role を作成すれば、サンプルノートブックを実行する権限を得ることが可能です。ノートブックインスタンスの起動方法は こちらの記事 をご参照ください。
- Amazon SageMaker
- Amazon ECR: SageMaker Processing で使用するコンテナイメージ格納用
- Amazon S3: SageMaker Processing の入出力データ保存用
docker 用ディレクトリの移動
ノートブックインスタンスのデフォルトのパーティションの構成だと、docker build を繰り返すうちにすぐにパーティションの容量がいっぱいになってしまうため、docker 関連ファイルを保存するパスを容量に余裕があるパーティションに移動します。
%%bash
sudo service docker stop
sudo mv /var/lib/docker /home/ec2-user/SageMaker/docker
sudo ln -s /home/ec2-user/SageMaker/docker /var/lib/docker
sudo service docker start
データの取得と S3 へのアップロード
このサンプルノートブックでは、MNIST 手書き数字データセットの画像を任意の倍率でリサイズする処理を SageMaker Processing で実行します。そのため、以下のコードで MNIST データセットの取得と、そのうちのテスト用データの 10000画像を S3 へのアップロードを行います。
!aws s3 cp s3://fast-ai-imageclas/mnist_png.tgz . --no-sign-request
!tar -xvzf mnist_png.tgz
data_dir = 'mnist_png/testing'
inputs = sagemaker_session.upload_data(path=data_dir, bucket=bucket, key_prefix=prefix)
print('input spec (in this case, just an S3 path): {}'.format(inputs))
コンテナイメージのビルドと Amazon ECR へのプッシュ
SageMaker Processing では、任意のコンテナイメージを使用することができます。requirements.txt と Dockerfile を使ってコンテナイメージをビルドし、Amazon ECR にプッシュします。
def build_and_push_image(repo_name, docker_path, extra_accounts=[], tag = ':latest'):
uri_suffix = 'amazonaws.com'
repository_uri = '{}.dkr.ecr.{}.{}/{}'.format(account_id, region, uri_suffix, repo_name + tag)
!docker build -t $repo_name $docker_path
for a in extra_accounts:
!aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {a}.dkr.ecr.{region}.amazonaws.com
!aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {account_id}.dkr.ecr.{region}.amazonaws.com
!aws ecr create-repository --repository-name $repo_name
!docker tag {repo_name + tag} $repository_uri
!docker push $repository_uri
return repository_uri
image_repository_uri = build_and_push_image(project_name + '-prepro-' + user_name, './docker/prepro')
SageMaker Processing Job の実行
ここからが、肝心のデータ並列処理実行部分です。まずは Processor を作成します。このとき、パラメタ instance_count
で、データ処理に使用したいインスタンスの数を指定します。サンプルノートブックでは 4台のインスタンスを使用する設定にしています。
processor = Processor(
image_uri=image_repository_uri,
entrypoint=["python3", f"{code_path}/prepro.py"],
# env={"ENV": "value"},
role=role,
instance_count=4,
instance_type="ml.t3.medium"
)
次に、データ処理内容が記載されたスクリプトファイルを S3 にアップロードします。サンプルノートブックでは、データ処理スクリプトはひとつだけですが、スクリプトファイルが複数に分かれている場合はノートブックインスタンスの code ディレクトリにそれらを格納しておけば、同様の方法で利用可能です。
code_s3_path = sagemaker_session.upload_data(
SCRIPT_LOCATION,
bucket=bucket,
key_prefix=os.path.join(project_name, user_name, "code", timestamp),
)
このスクリプトの 32行目あたりに以下の記述があります。Processing Job が使用するインスタンスの情報が /opt/ml/config/resourceconfig.json
に記載されており、複数インスタンスを使用した場合にスクリプトがどのインスタンスで実行されているのかを把握することができます。
with open('/opt/ml/config/resourceconfig.json') as f:
host_settings = json.load(f)
current_host = host_settings['current_host']
print('current_host:', current_host)
run() API を使って SageMaker Processing Job を実行します。inputs
に入力データの情報を設定します。このサンプルノートブックでは、データ処理用スクリプトと MNIST 画像が入力データです。ここで注目したいのは MNIST 画像を指定している部分で、s3_data_distribution_type
に ShardedByS3Key
を設定しています。これは、使用するインスタンスに入力データを均等に分配せよという指示を SageMaker Processing にしていることになります。
データ並列処理には関係ありませんが、outputs
で s3_upload_mode='Continuous'
としています。デフォルト設定では、処理結果のデータは Processing Job 終了後に一括で S3 にアップロードされますが、この様に設定しておくとデータ処理結果をインスタンスに保存するたびに S3 に都度アップロードすることができます。この設定はデータ処理の進捗をチェックするために便利かもしれません。
processor.run(
job_name=job_name,
inputs=[
ProcessingInput(
input_name='code',
source=code_s3_path,
destination=code_path),
ProcessingInput(
input_name='data',
source=inputs,
destination=input_path,
s3_data_distribution_type='ShardedByS3Key')],
outputs=[
ProcessingOutput(
output_name='result',
source=output_path,
destination=output_s3_path,
s3_upload_mode='Continuous')],
arguments=['--code-path', code_path,
'--input-data-path', input_path,
'--output-data-path', output_path,
'--scale', '2.0'],
logs=False,
wait=False
)
データ処理時間の比較
使用するインスタンス数を変えて同じデータ処理を実行した結果は以下の通りです。複数回試行して平均を取るなどしていないのであまり正確ではないですが、傾向は掴めると思います。おおよそインスタンスを増やした分だけ(4つのインスタンスで 4.0倍の高速化)データ処理時間を短縮できていそうです。
インスタンス数 | 処理時間 [sec] | 処理時間 [min] | 時間短縮効果 |
---|---|---|---|
1 | 10339.2 | 172.32 | 1.0 |
2 | 5134.19 | 85.6 | 2.0 |
4 | 2592.352 | 43.2 | 4.0 |
8 | 1312.864 | 21.9 | 7.9 |
Processing Job を実行すると、インスタンスの起動、コンテナイメージの実行、入力データのダウンロードなどの準備が実行された後でデータ処理が行われます。そのため、データ処理が軽量な場合は準備部分が支配的になって処理時間短縮効果が期待ほど得られない可能性がありますのでご注意ください。ためしに処理内容を軽くして(仕込んでいた 1秒の sleep を 0.1 秒に短縮)同様の実験をしたところ、 4つのインスタンスで 3.6 倍の高速化となりました。先ほどは 4.0 倍だったのでやや短縮効果が低くなっています。
インスタンス数 | 処理時間 [sec] | 処理時間 [min] | 時間短縮効果 |
---|---|---|---|
1 | 1228.252 | 20.5 | 1.0 |
4 | 336.545 | 5.6 | 3.6 |
おわりに
以上、Amazon SageMaker Processing を使ってデータ並列処理を実現する方法をご紹介しました。Processing Job で使用するインスタンス数を 2以上に設定し、入力データを各インスタンスに分配するよう設定するだけで簡単にデータ並列処理を実現できることがわかったと思います。ローカル PC などで使用している処理スクリプトをほとんどそのまま流用できるので、データ処理時間を短縮したいなと思っていたらぜひお試しください。