LoginSignup
0
0

【AWS SageMaker】NotebookインスタンスにてProcessing(処理ジョブ)を行ってみた

Posted at

背景

AWS SageMakerを勉強していると、学習で用いる「トレーニング > トレーニングジョブ」とは別に、「Processing > 処理ジョブ」がありました。処理ジョブ(Processing Job)の方は、データの前処理などを行う用途のようですので、理解を深めるために触ってみました。

行った事(概要)

音声通話をテキスト化するシステムのサンプルファイルが手元にありましたので、こちらを前処理のネタにしてみようと思います。
前処理の内容としては、通話内の話者毎(OPとCU)の発話で1行ずつ音声テキスト化のデータが入っているので、これを、通話毎に1行ずつの音声テキストのデータが入るようにしてみます。

具体的には、前処理の前はこのような形で、

_キャプチャ1.png
_キャプチャ2.png

前処理の後はこのような形にしてみます。

_キャプチャ3.png


※Notebookインスタンスの.ipynbだけで出来てしまう程度のものですが、今回はSageMaker Processingの勉強として、敢えてProcessingで行ってみます。

行った事(詳細)

1. データを保存するS3を用意

SageMaker用に作成済みのS3バケットに、「for_sagemaker_processing_job」というフォルダを新規に作成しました。そして、そのフォルダに入力データとなるCSVファイルを「conversation.csv」というファイル名で保存しました。

_キャプチャ5.png

2. SageMaker Notebookインスタンス内に.ipynbファイルを作成

カーネルは「conda_python3」を選びました。

_キャプチャ4.png

3. 作成した.ipynbファイルにPythonコードを記載 ~その1~

3.1. 必要なライブラリーをインポート
hogehoge.ipynb
from datetime import datetime
import os
import numpy as np
import pandas as pd
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput
from sagemaker.processing import ProcessingOutput
3.2. 初期変数を設定
hogehoge.ipynb
ROLE = sagemaker.get_execution_role()
SAGEMAKER_SESSION = sagemaker.Session()
S3_BUCKET = SAGEMAKER_SESSION.default_bucket()
S3_PREFIX = "data-for-machine-learning/test/for_sagemaker_processing_job"
CONTAINER_FOLDER = "/opt/ml/processing"
PROCESSING_JOB_FILENAME = "test_processing_job.py"
INPUT_FILENAME = "conversation.csv"
OUTPUT_FILENAME = "conversation_after_preprocess.csv"

変数ROLEは、このNotebookインスタンスに割り当てられているロール名を保存します。
変数SAGEMAKER_SESSIONは、セッション情報を保存します。
これらは、sagemakerライブラリーを触る時に使います。

変数S3_BUCKETS3_PREFIXは、先程作成したS3のバケット名とフォルダ名になります。
変数CONTAINER_FOLDERは、実際にProcessing Jobが走るインスタンス(コンテナ)上でデータを扱う際は、この「/opt/ml/processing」フォルダ内で扱う事がお作法になっているらしいので、文字列「/opt/ml/processing」を変数CONTAINER_FOLDERに保存します。

変数PROCESSING_JOB_FILENAMEは「Processing Jobで行う処理をPythonで記載した.pyファイルのファイル名」、
変数INPUT_FILENAMEは「先程S3に保存した入力データとなる前処理前のファイル名」、
変数OUTPUT_FILENAMEは「Processing Jobの出力データとなる前処理後のファイル名」
になります。

3.3. ファイルやフォルダのパスを変数に保存

この後に、sagemakerライブラリーのrunメソッドを使ってProcessing Jobを起動させる際、runメソッドの引数に、「プログラムの場所は?」「入力データの場所は?」「出力データの出力先は?」などの情報が必要になるので、分かりやすいようにそれぞれのパスを変数に保存します。

hogehoge.ipynb
s3_processing_job_path = "s3://" + os.path.join(S3_BUCKET, S3_PREFIX, PROCESSING_JOB_FILENAME)
print(s3_processing_job_path)
s3_input_data_path = "s3://" + os.path.join(S3_BUCKET, S3_PREFIX, INPUT_FILENAME)
print(s3_input_data_path)
container_input_folder_path = CONTAINER_FOLDER + "/my_data/input"
print(container_input_folder_path)
container_output_folder_path = CONTAINER_FOLDER + "/my_data/output"
print(container_output_folder_path)
container_output_data_path = os.path.join(container_output_folder_path, OUTPUT_FILENAME)
print(container_output_data_path)
s3_output_folder_path = "s3://" + os.path.join(S3_BUCKET, S3_PREFIX)
print(s3_output_folder_path)
s3://sagemaker-ap-northeast-1-*******/data-for-machine-learning/test/for_sagemaker_processing_job/test_processing_job.py
s3://sagemaker-ap-northeast-1-*******/data-for-machine-learning/test/for_sagemaker_processing_job/conversation.csv
/opt/ml/processing/my_data/input
/opt/ml/processing/my_data/output
/opt/ml/processing/my_data/output/conversation_after_preprocess.csv
s3://sagemaker-ap-northeast-1-*******/data-for-machine-learning/test/for_sagemaker_processing_job

ここで、一旦.ipynbファイルから離れて、Processing Jobでの処理を記載する.pyファイルを作成します。

4. Processing Jobでの処理を記載する.pyファイルを作成

4.1. .pyファイルを作成

.pyファイルの作成はローカルでもNotebookインスタンスでも大丈夫です。
ファイル名は、先程の変数PROCESSING_JOB_FILENAMEで設定した「test_processing_job.py」にします。

4.2. 処理内容を記載

test_processing_job.pyに処理内容を記載します。

test_processing_job.py
import os
import sys
import numpy as np
import pandas as pd

# インスタンス(コンテナ)上にフォルダを作成
os.makedirs(sys.argv[1], exist_ok=True)
os.makedirs(sys.argv[2], exist_ok=True)
# S3からインスタンス(コンテナ)上にコピーされた前処理前データを読み込み
conversation2text_df = pd.read_csv(filepath_or_buffer="{a}/conversation.csv".format(a=sys.argv[1]),
                                   encoding="shift-jis")
start_flag = True  # 1行目かどうかのフラグ
conversation_id_text_dict = {}  # 会話IDと会話テキストを保存するdict
for index, row in conversation2text_df.iterrows():
    conversation_id = row["通話ID"]
    conversation_channel = row["音声のチャンネル種類"]
    conversation_text = row["発言内容(認識結果)"]
    if start_flag == True:  # 1行目だった場合
        pre_conversation_id = row["通話ID"]
        if conversation_channel == "OP":
            conversation_all_text = "オペレータ「" + conversation_text + ""
        elif conversation_channel == "CU":
            conversation_all_text == "客「" + conversation_text + ""
        else:
            continue
        start_flag = False
        continue
    else:  # 2行目以降だった場合
        if conversation_id == pre_conversation_id:  # まだ同じ会話(1つ前のデータの会話IDと同じ)の場合
            if conversation_channel == "OP":
                conversation_all_text = conversation_all_text + "オペレータ「" + conversation_text + ""
            elif conversation_channel == "CU":
                conversation_all_text = conversation_all_text + "客「" + conversation_text + ""
            else:
                continue
        else:  # 違う会話(1つ前のデータの会話IDと違う)の場合
            conversation_id_text_dict[pre_conversation_id] = conversation_all_text  # dictに前の会話IDと会話テキストを保存
            if conversation_channel == "OP":
                conversation_all_text = "オペレータ「" + conversation_text + ""
            elif conversation_channel == "CU":
                conversation_all_text = "客「" + conversation_text + ""
            else:
                continue
            pre_conversation_id = conversation_id
conversation_id_text_dict[conversation_id] = conversation_all_text  # 一番下の会話は上記のロジックを通らないので、ここで拾う
result_id_text_df = pd.DataFrame(data={"conversation_id": list(conversation_id_text_dict.keys()),
                                       "conversation_text": list(conversation_id_text_dict.values())})
# 前処理後データのDataFrameを、CSVファイルとしてインスタンス(コンテナ)上のフォルダに保存
result_id_text_df.to_csv(path_or_buf=sys.argv[3],
                         index=False,
                         encoding="shift-jis")

あまりスマートなコードでなくてゴメンなさい。
処理内容としては、話者がOPの場合はテキストをOP「」で囲って、話者がCUの場合はテキストを客「」で囲った後、通話毎に1つ(1行)のテキストとしています。

コードの中で、sys.argv[]を使って、プログラム実行時の引数を引き込んでいますが、この引数は、Processing Jobを起動する際のsagemakerライブラリーのrunメソッドの際に指定しますので、改めて後述します。

5. 作成した.pyファイルをS3に保存

前処理前のデータであるCSVファイルを保存したフォルダと同じフォルダに、作成した.pyファイルを保存します。

_キャプチャ6.png

6. 作成した.ipynbファイルにPythonコードを記載 ~その2~

先程作成した.ipynbファイルに続きのPythonコードを書いていきます。

6.1. Processing Jobで用いるDockerイメージを設定

実際にProcessing Jobが走るのは、当該Notebookインスタンスとは別のインスタンスになります。そのインスタンスのコンテナ上でProcessing Jobは走るので、コンテナのためのDockerイメージを設定します。

SageMaker Processingでは、「AWS側で用意済みのDockerイメージ」と「自分で用意したDockerイメージ」の大きくは2パターンがありまして、今回は設定が簡単な前者にします。

前者にもいくつかDockerイメージの種類があります。今回は「sckit-learn」が使えるように用意されているDockerイメージを使います。
(今回の前処理ではsckit-learnを全然使っていませんけれど(笑)、numpyやpandasも普通に使えたので、このDockerイメージを選択しました。)

hogehoge.ipynb
sklearn_processor = SKLearnProcessor(framework_version="1.2-1",
                                     role=ROLE,
                                     instance_count=1,
                                     instance_type="ml.m5.large")
print(type(sklearn_processor))
<class 'sagemaker.sklearn.processing.SKLearnProcessor'>

SKLearnProcessorは、sagemaker.sklearn.processingから呼び出しています。
引数の意味はそれぞれ下記になります。
■framework_version
 利用するsckit-learnのバージョンになります。
 こちらのサイトの「Supported Scikit-learn version」から選択します。

■role
 割り当てるロールになります。
■instance_count
 Processing Jobを走らせるインスタンスの数になります。
■instance_type
 Processing Jobを走らせるインスタンスの種類になります。

6.2. Processing Jobを起動

それでは、runメソッドを使って、Processing Jobを起動して走らせます。

hogehoge.ipynb
now = datetime.now()
year = str(now.year)
month = str(now.month)
day = str(now.day)
hour = str(now.hour)
minute = str(now.minute)
second = str(now.second)
job_timestamp = year + month + day + hour + minute + second
sklearn_processor.run(code=s3_processing_job_path,
                      inputs=[ProcessingInput(source=s3_input_data_path,
                                              destination=container_input_folder_path)],
                      outputs=[ProcessingOutput(source=container_output_folder_path,
                                                destination=s3_output_folder_path)],
                      job_name="test-processing-job-{a}".format(a=job_timestamp),
                      arguments=[container_input_folder_path,
                                 container_output_folder_path,
                                 container_output_data_path],
                      wait=True,
                      logs=True)

タイムスタンプの部分はスマートでなくて、ゴメンなさい。

先程のSKLearnProcessorクラスで作成したオブジェクトにてrunメソッドを用います。
引数の意味はそれぞれ下記になります。
■code
 Processing Jobでの処理内容が書かれているプログラムファイルの保存場所。
■inputs
 「どのデータを入力データとしてインスタンス(コンテナ)に渡すか」「渡された入力データをインスタンス(コンテナ)のどこに保存するか」を、ProcessingInputメソッドを使って設定します。
 前者はProcessingInputメソッドの引数sourceに、後者はProcessingInputメソッドの引数destinationに設定します。
 また、細かい仕様を理解出来てはいませんが、試した限りですと、引数sourceは「ファイル」のパスを、引数destinationは「フォルダ」のパスを設定すると、想定通りに動いてくれました。
■outputs
 「インスタンス(コンテナ)のどのデータを出力データとするか」「インスタンス(コンテナ)からの出力データをどこに保存するか」を、ProcessingOutputメソッドを使って設定します。
 前者はProcessingOutputメソッドの引数sourceに、後者はProcessingOutputメソッドの引数destinationに設定します。
 こちらも、細かい仕様を理解出来てはいませんが、試した限りですと、引数sourceは「フォルダ」のパスを、引数destinationも「フォルダ」のパスを設定すると、想定通りに動いてくれました。イメージとしては、引数sourceで設定したフォルダ内の全てのファイルが、引数destinationで設定したフォルダ内にコピーされるように思われます。
■job_name
 Processing Jobとしての名前。
■arguments
 Processing Job側に渡す引数。
 Pythonならば、プログラム側でsys.argv[要素番号]等を用いて、渡された引数を使えます。
■wait
 Notebookインスタンス側でProcessing Jobの完了を待つかどうか。
■logs
 Notebookインスタンス側でProcessing Job実行時のログを表示するかどうか。


runメソッドを行うと、下記のようなログが表示され、しばらく待つと処理が完了しました。

INFO:sagemaker:Creating processing-job with name test-processing-job-20245229710
....................................
..

7. Processing Jobの結果を確認

まずは、マネジメントコンソールのSageMakerから「Processing > 処理ジョブ」を確認してみます。

_キャプチャ7.png

対象のjobnameのProcessing Jobがありました。ステータスもCompletedです。
クリックしてみると、runメソッドで設定した引数の内容や、その他にも実際に要した処理時間(秒)の記載などがありました。


続いて、インスタンス(コンテナ)からの出力データの保存先として設定していたS3のフォルダを確認してみます。

_キャプチャ8.png

インスタンス(コンテナ)上でのファイル名「conversation_after_preprocess.csv」と同じファイル名のファイルがあります。
こちらのファイルをダウンロードして中身を確認してみると、

_キャプチャ3.png

想定通りの前処理後の形になっていました。


以上になります。

まとめ

簡易な前処理の内容でしたが、SageMaker Processingの動きを何となく理解する事が出来ました。前処理の内容を記載したプログラムファイルや、入力データはS3上にあるものを利用出来るというのが便利と思いました。
また、前処理の内容を記載したプログラムファイルは、S3上でなく、Notebookインスタンスのローカル上にあっても、runメソッドの引数inputsのsourceに当該データのパスを設定すれば、利用出来るようです。

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0