3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Azure ML Python SDK を使う5:パイプラインの基本

Last updated at Posted at 2020-12-27

今回の内容

今回は複数のタスク(例えばデータ準備とモデルトレーニングにような)を、ひとつのパイプラインとして実行する機能を紹介します。
完成したパイプラインはエンドポイントとして公開できるようになり、自動化のパーツとすることができます。それ自身でスケジュール実行もできますし、呼出し時にパラメタを渡すこともできますが、今回は最も基本的なサンプルで説明しようと思います。

ここでのシナリオ

いま STEP1 と STEP2 の2つのタスクをシーケンシャルに実行するパイプラインを考えます。(下図の「パイプラインの構成」)

  • STEP1 は input_data1 からファイル "HelloWorld.txt" を受取り、中間ファイル "output.csv" を inout に格納します
  • STEP2 は inout のファイルを取出し、そのまま output2 に格納します

input_data1, inout, output2 はそれぞれスクリプト上のオブジェクト名なので、実体としては demostore という名前で登録された blog ストレージのフォルダ、work, work_out, work_out2 が参照されます。(下図の「ストレージの構成」)
また Nootebook のローカルには script_pipeline フォルダが準備されていて、STEP1 と STEP2 の処理内容が script2.py と script2-1.py に実装されているものとします。(下図の「スクリプトフォルダの構成」)

20220705_2.png

手順

これまでどおり手順をみていきます。

  1. パッケージの読込み

    パッケージを読込みます。

    from azureml.core import Workspace, Experiment, Dataset, Datastore
    from azureml.core.compute import ComputeTarget
    from azureml.core.compute_target import ComputeTargetException
    from azureml.core.runconfig import RunConfiguration
    from azureml.core.conda_dependencies import CondaDependencies
    from azureml.data import OutputFileDatasetConfig
    from azureml.pipeline.core import Pipeline
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.widgets import RunDetails
    
    workspace = Workspace.from_config()
    
  2. コンピューティングクラスターの指定

    コンピューティングクラスターを指定します。

    aml_compute_target = "demo-cpucluster1"  # <== The name of the cluster being used
    try:
        aml_compute = ComputeTarget(workspace, aml_compute_target)
        print("found existing compute target.")
    except ComputeTargetException:
        print("no compute target with the specified name found")
    
  3. input と output フォルダの指定

    demostore は Azure ML Workspace に登録したデータストア名です。データストアの BLOB コンテナ内のファイルパスを dataset クラスに渡しています。

    ds = Datastore(workspace, 'demostore')
    input_data1 = Dataset.File.from_files(ds.path('work/HelloWorld.txt')).as_named_input('input_ds').as_mount()
    
    inout = OutputFileDatasetConfig(destination=(ds, 'work_out'))
    output2 = OutputFileDatasetConfig(destination=(ds, 'work_out2')) # can not share output folder between steps
    

    これは、Azure ML Python SDK を使う3:アウトプットを Blob ストレージに書きだす - その1 で示したインプット/アウトプットのペアと同様ですが、ここでは OutputFileDatasetConfig で定義されるアウトプットが 2つある点が異なります。またこの時点では inout が中間ファイル置き場であることが関連づけられていませんが、これは後続セルで関連づけられます。

  4. 環境の指定

    次のセルで使用する PythonScriptStep では、これまでの例で使った Environment() ではなく、RunConfiguration() を使う必要があります。

    run_config = RunConfiguration()
    run_config.environment.python.conda_dependencies = CondaDependencies(conda_dependencies_file_path="conda.yml")
    

    conda.yml はノートブックと同じフォルダに配置されているものとします。

    name: my-env
    dependencies:
    - python=3.8.5
    
    - pip:
      - azureml-defaults
    channels:
    - anaconda
    - conda-forge
    
  5. 実行ファイル名の指定

    これまでのサンプルでは ScriptRunConfig によって実行ファイル名や環境を構成していましたが、パイプラインでは PythonScriptStep を使います。(パイプラインの内容によってはこれに限りません。)
    引数の構成は ScriptRunConfig とほとんど同じです。
    パイプラインは複数のタスクをひとまとめにしてコンピュートクラスターに投げこみますが、ここでは STEP1 と STEP2 を準備します。

    step1 = PythonScriptStep(name="step1",
                             script_name="script2.py", 
                             source_directory='script_pipeline',
                             arguments =['--datadir', input_data1, '--output', inout],
                             compute_target=aml_compute, 
                             runconfig=run_config,
                             allow_reuse=True)
    
    step2 = PythonScriptStep(name="step2",
                             script_name="script2-1.py", 
                             source_directory='script_pipeline',
                             arguments =['--datadir', inout.as_input(name='inout'), '--output', output2],
                             compute_target=aml_compute, 
                             runconfig=run_config,
                             allow_reuse=True)
    

    STEP1 のアウトプットとして定義された inout が、STEP2 のインプットで .as_input() として定義しなおされています。これによって STEP1 と STEP2 が間接的に関連づけられ、同時に inout が中間ファイル置き場であることが指定されます。
    この関連づけがないと、STEP1 と STEP2 はパラレル実行されることになります。(ただしパラレル実行する場合 STEP1 のアウトプットフォルダ名と STEP2 のインプットフォルダ名は異なっている必要があります)

  6. パイプラインの定義

    パイプラインを定義します。

    pipeline1 = Pipeline(workspace=workspace, steps=[step1, step2])
    

    パイプラインの構成は事前に検証することができます。

    pipeline1.validate()
    print("Pipeline validation complete")
    
  7. パイプランの実行

    パイプラインを実行します。

    pipeline_run = Experiment(workspace, 'pipeline-test').submit(pipeline1, regenerate_outputs=False)
    

    パイプラインに限ったことではありませんが、RunDetails を使って実行状況をモニタリングできます。これまでご紹介していなかったのでここで使ってみます。

    RunDetails(pipeline_run).show()
    

    このウィジェットに以下のような実行状態が表示されます。
    この実行グラフは Azure ML Studio でも参照することができます。

    Azureml6-2.png

  8. パイプライン終了の待機

    パイプラインの終了を待ちます

    %%time
    pipeline_run.wait_for_completion(show_output=True)
    
  9. STEP1, STEP2 の内容

    参考のためスクリプトの内容を例示します。これまで使ってきたサンプルとほとんど同じつくりにしています。
    script2.py

    import argparse
    import os
    
    print("*********************************************************")
    print("*************          Hello World!         *************")
    print("*********************************************************")
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--datadir', type=str, help="data directory")
    parser.add_argument('--output', type=str, help="output")
    args = parser.parse_args()
    
    print("Argument 1: %s" % args.datadir)
    print("Argument 2: %s" % args.output)
        
    with open(args.datadir, 'r') as f:
        content = f.read()
        with open(os.path.join(args.output, 'output.csv'), 'w') as fw:
            fw.write(content)
    

    script2-1.py

    import argparse
    import os
    
    print("*********************************************************")
    print("*************          Hello World!         *************")
    print("*********************************************************")
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--datadir', type=str, help="data directory")
    parser.add_argument('--output', type=str, help="output")
    args = parser.parse_args()
    
    print("Argument 1: %s" % args.datadir)
    print("Argument 2: %s" % args.output)
    
    for fname in next(os.walk(args.datadir))[2]:
        print('processing', fname)
        with open(os.path.join(args.datadir, fname),'r') as f:
            content = f.read()
            with open(os.path.join(args.output, fname), 'w') as fw:
                fw.write(content)
    

    なお、ここで使った HelloWorld.txt は非常に簡単なファイルです。

    0,Hello World
    1,Hello World
    2,Hello World
    

おわりに

いかがでしょうか。パイプラインをつくることで Machine Learning タスクが自動化できるようになります。MLOps の基本構成要素として非常に重要な機能を提供します。

参考資料

Azure Machine Learning Pipelines: Getting Started
PythonScriptStep Class
Azure ML Python SDK を使う1:データセットをインプットとして使う - その1
Azure ML Python SDK を使う2:データセットをインプットとして使う - その2
Azure ML Python SDK を使う3:データセットをインプットとして使う - その1
Azure ML Python SDK を使う4:データセットをインプットとして使う - その2
Azure/MachineLearningNotebooks
Azure Machine Learning SDK for Python とは

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?