80
59

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Cloud RunでサーバレスMLOpsを試してみた

Posted at

この記事はMLOps Advent Calendar 2020の17日目の記事です。

もともとはGoogle Cloudからもうすぐ出てくる予定のFeature Storeの紹介でも書こうかと思ったのですが、まだ出てこないな...と思ってたら、Google Cloud ソリューションアーキテクトの中井さん(@enakai00)がいい感じのサーバレスMLOpsソリューションをタイミングよく公開されてたので、紹介させていただくことにしました。

なぜサーバレスMLOpsか?

サーバレスMLOpsって言葉、中井さんのツイートを見て思いついたのですが、もうどこかで使われてますかね? なぜサーバレスにしたいかという前提として、中井さんも書かれてた「k8s/GKEはそれほど手軽じゃない」問題があります。

そこそこの規模の実運用環境であれば、AI Platform Pipelines(Kubeflow Pipelinesのマネージドサービス)がGoogle Cloudにおける定番のMLOps基盤になります。とはいえ、これ下はGKE(マネージドk8s)で動いてます。つまり、VMを起動しておく必要があり、一定額のコストが発生します。

AI Platform Pipelinesの構成
AI Platform Pipelinesの構成

そこまでの規模も予算もないよ、って用途におすすめしたいのがサーバレスMLOpsです。中井さんが提案されたソリューションのように、Google CloudであればCloud Runを使ってすべてをサーバレスで構築すれば、パイプラインを起動してないときはインスタンス代も発生しない(ストレージ代のみ)、というお手軽MLOpsが実現できます。あ、インスタンス止めるの忘れてた! 課金されてた...って残念なことも起きません。

Screen Shot 2020-12-17 at 17.17.58.png

サンプルの概要

中井さんが公開されたサンプルworkflows-ml-pipeline-exampleはこんな構成になっています。

このサンプルの例では、babyweightという赤ちゃんの体重を予測する既存のモデルをパイプライン化しています。大きく分けると、

  1. 上段:BigQueryからデータを取得、Cloud Dataflowで前処理してCloud Storageに保存
  2. 下段:AI Platform Trainingでモデル学習、AI Platform Predictionにデプロイ

という2段階のパイプラインになっており、各段階をCloud Runで実行します。またサンプルの後半では、この前処理と学習のオーケストレーションをCloud Workflowsで自動化する例も出てきます。

Cloud Runを動かしてみた

GCPプロジェクトを用意して、READMEに記載されているサンプルコードをコピペしていくだけですんなり動きました。PROJECT_IDに自分のプロジェクトIDをセットし、あとは他の環境変数の設定やGCSバケットの作成、そして関連するAPIを有効に設定して、サンプルコードをcloneします。

Cloud Dataflowで前処理

で、まずは前処理を実行するCloud Runのコンテナイメージをビルドします。

gcloud builds submit --tag gcr.io/$PROJECT_ID/preprocess-service

このコンテナイメージには、サンプルに用意された以下のような前処理のコードが組み込まれます。

services/preprocess/df_pipeline/pipe.py
def preprocess(limit, output_dir, job_name, region):
    options = {
        'region': region,
        'staging_location': os.path.join(output_dir, 'tmp', 'staging'),
        'temp_location': os.path.join(output_dir, 'tmp'),
        'job_name': job_name,
        'project': PROJECT_ID,
        'max_num_workers': 3,   # CHANGE THIS IF YOU HAVE MORE QUOTA
        'setup_file': './setup.py'
    }
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    RUNNER = 'DataflowRunner'
    p = beam.Pipeline(RUNNER, options=opts)
    query = """
SELECT
  weight_pounds,
  is_male,
  mother_age,
  plurality,
  gestation_weeks,
  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
AND weight_pounds > 0
AND mother_age > 0
AND plurality > 0
AND gestation_weeks > 0
AND month > 0
    """
  
    if limit:
        query = query + ' LIMIT {}'.format(limit)
  
    for step in ['train', 'eval']:
        if step == 'train':
            selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query)
        else:
            selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)

        (p 
         | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=selquery, use_standard_sql=True))
         | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
         | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(output_dir, '{}.csv'.format(step))))
        )

    return p.run()

この前処理では、BigQueryでクエリを実行してデータを読み込み、欠損値(超音波診断結果がない場合)の対応や文字列変換、trainとevalへの分割等を行っています。

Cloud Runは、この前処理ジョブの起動をRESTで受け付けるエンドポイントとして動きます。そのために、このサンプルでは以下のようなコードを用いています。

servies/preprocess/main.py
@app.route('/api/v1/preprocess', methods=['POST'])
def preprocess():

    <...snip...>

    result = pipe.preprocess(limit, output_dir, job_name, region)

    resp = {'jobName': job_name,
            'jobId': result.job_id(),
            'outputDir': output_dir}

    return resp, 200

Cloud Runでは、こんなふうにアノテーションを使ってエンドポイントURIと関数をマッピングできます。この例では、上述のDataflowコードpipe.preprocessを呼び出しているのが分かります。

このコンテナイメージのビルドが終わったら、Cloud Runにデプロイします。

gcloud run deploy preprocess-service \
  --image gcr.io/$PROJECT_ID/preprocess-service \
  --platform=managed --region=us-central1 \
  --no-allow-unauthenticated \
  --memory 512Mi \
  --set-env-vars "PROJECT_ID=$PROJECT_ID"

サーバレスなので、この段階ではインスタンスは生成されません。

Kerasで学習

同様にして、学習用のコンテナイメージもビルドします。これには以下のようなKerasのモデルが組み込まれます。

babyweight_model/trainer/model.py
def create_keras_model(learning_rate=0.001):
    wide, deep, inputs = get_wide_deep()
    feature_layer_wide = layers.DenseFeatures(wide, name='wide_features')
    feature_layer_deep = layers.DenseFeatures(deep, name='deep_features')

    wide_model = feature_layer_wide(inputs)

    deep_model = layers.Dense(64, activation='relu', name='DNN_layer1')(feature_layer_deep(inputs))
    deep_model = layers.Dense(32, activation='relu', name='DNN_layer2')(deep_model)

    wide_deep_model = layers.Dense(1, name='weight')(layers.concatenate([wide_model, deep_model]))
    model = models.Model(inputs=inputs, outputs=wide_deep_model)

    # Compile Keras model
    model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(lr=learning_rate))
    return model

このモデルでは、母親の年齢や妊娠週といった連続値は3層のMLPに入れ、性別や胎児数といったカテゴリ値は1層のNNに入れるという設計になっています。

download (1).png

このモデルに対して先程の前処理でCloud Storageに保存した学習データを使い学習を行いたいわけですが、そのためのCloud Runのエンドポイント定義は以下のようになります。

services/train/main.py
@app.route('/api/v1/train', methods=['POST'])
def train():
    json_data = request.get_json()

    scale_tier = 'BASIC_GPU'
    region = 'us-central1'
    runtime_version = '2.2'
    python_version = '3.7'
    data_dir = None
    job_dir = None
    num_train_examples = '60000000'
    num_eval_examples = '50000'
    num_evals = '100'
    learning_rate = '0.0001'

    <...snip...>

        training_inputs = {
            'scaleTier': scale_tier,
            'packageUris': [package],
            'pythonModule': 'trainer.task',
            'region': region,
            'jobDir': job_dir,
            'runtimeVersion': runtime_version,
            'pythonVersion': python_version,
            'args': [
                '--data-dir', data_dir,
                '--num-train-examples', num_train_examples,
                '--num-eval-examples', num_eval_examples,
                '--num-evals', num_evals,
                '--learning-rate', learning_rate
            ]
        }
        job_spec = {'jobId': job_id, 'trainingInput': training_inputs}

        credentials = GoogleCredentials.get_application_default()
        api = discovery.build(
            'ml', 'v1', credentials=credentials, cache_discovery=False)
        api_request = api.projects().jobs().create(
            body=job_spec, parent='projects/{}'.format(PROJECT_ID))
        resp = None
        try:
            resp = api_request.execute()
        except googleapiclient.errors.HttpError as err:
            resp = {'message': err._get_reason()}
            return resp, 500

        return resp, 200

ここでは、Google Cloudが提供するマネージドのML学習環境であるAI Platform Trainingを呼び出しています。学習に使う計算リソースやリージョン、学習率等のパラメータを渡して、学習ジョブを開始してるのがなんとなく分かるかと思います。

この学習用のコードも、前処理コードと同様にコンテナイメージにビルドし、Cloud Runにデプロイします。Cloud Runのコンソールを見ると、それぞれのCloud Runエンドポイントがデプロイされていることが確認できます。

Screen Shot 2020-12-17 at 17.56.03.png

テスト

この状態で、前処理と学習のそれぞれのCloud Runエンドポイントを叩いてテストできます。まずは前処理の方。

curl -X POST -H "Authorization: Bearer $(gcloud auth print-identity-token)" \
  -H "Content-Type: application/json" \
  -d "{\"limit\":\"1000\", \"outputDir\":\"$BUCKET/preproc\"}" \
  -s $PREPROCESS_SERVICE_URL/api/v1/preprocess | jq .

このジョブが完了すると、Cloud Storageのバケットには前処理済みの学習データが保存されます。

Screen Shot 2020-12-17 at 18.33.31.png

つづいて、同様にして学習のエンドポイントも叩きます。学習ジョブが完了すると、AI Platformのモデルコンソールでは作成されたモデルの詳細を確認できます。

Screen Shot 2020-12-17 at 18.39.07.png

Cloud Workflowsでサーバレス オーケストレーション

さて、ここまではCloud Runを使ってVMを立てずに前処理や学習を起動する方法について見てきました。中井さんのサンプルコードでは、これらを組み合わせるパイプライン構築もサーバレスに実現されています。

このオーケストレーションには、サーバレス ワークフローツールであるCloud Workflows(ベータ版)を使用します。

Screen Shot 2020-12-17 at 18.41.35.png

Google CloudではCloud Composerというワークフローツールがすでに提供されていますが、ComposerはGKEの上にApache Airflowを載せる構成。一方、Workflowsはサーバレス実装となっており、Cloud Runと同様のお手軽感でオーケストレーションを実装できる新サービスです。このWorkflowsについてはYuttyさんによる日本語解説がありますので、合わせてご覧ください。

今回のサンプルコードでは、以下のようなYAMLファイルでワークフローが定義されています。

workflows/ml_workflow.yaml
main:
  params: [args]
  steps:
    - assignLimit:
        steps:
          - assignDefault:
              assign:
                - limit: null
          - assignArgVal:
              switch:
                - condition: ${"limit" in args}
                  assign:
                    - limit: ${args.limit}
    - preprocessData:
        call: http.post
        args:
          url: PREPROCESS-SERVICE-URL/api/v1/preprocess
          body:
            "limit": ${limit}
            "outputDir": ${args.bucket + "/preproc"}
          auth:
            type: OIDC
        result: preprocess
    - sleepPreprocess:
        call: sys.sleep
        args:
            seconds: 60
    - getPreprocessStatus:
        call: http.get
        args:
          url: ${"PREPROCESS-SERVICE-URL/api/v1/job/" + preprocess.body.jobId}
          auth:
            type: OIDC
        result: preprocessStatus
    - waitPreprocess:
        switch:
          - condition: ${preprocessStatus.body.currentState == "JOB_STATE_DONE"}
            next: trainModel
          - condition: ${preprocessStatus.body.currentState == "JOB_STATE_FAILED"}
            return: ${preprocessStatus.body}
        next: sleepPreprocess
    - trainModel:
        call: http.post
        args:
          url: TRAIN-SERVICE-URL/api/v1/train
          body:
            "jobDir": ${args.bucket + "/trained_model"}
            "dataDir": ${preprocess.body.outputDir}
            "numTrainExamples": ${args.numTrainExamples}
            "numEvalExamples": ${args.numEvalExamples}
            "numEvals": ${args.numEvals}
          auth:
            type: OIDC
        result: train
    - sleepTrain:
        call: sys.sleep
        args:
            seconds: 60
    - getTrainStatus:
        call: http.get
        args:
          url: ${"TRAIN-SERVICE-URL/api/v1/job/" + train.body.jobId}
          auth:
            type: OIDC
        result: trainStatus
    - waitTrain:
        switch:
          - condition: ${trainStatus.body.state == "SUCCEEDED"}
            next: deployModel
          - condition: ${trainStatus.body.state == "FAILED"}
            return: ${trainStatus.body}
        next: sleepTrain
    - deployModel:
        call: http.post
        args:
          url: TRAIN-SERVICE-URL/api/v1/deploy
          body:
            "modelName": ${args.modelName}
            "versionName": ${args.versionName}
            "deploymentUri": ${train.body.trainingInput.jobDir + "/export"}
          auth:
            type: OIDC
        result: deploy
    - finish:
        return: ${deploy.body}

上から眺めていけば、前処理→学習→デプロイという流れになっており、それぞれでCloud Runのエンドポイントを呼び出してるのが分かるかと思います。このワークフローも、コンソール上で詳細や実行結果を確認できます。

Screen Shot 2020-12-17 at 18.51.04.png

AI Platform Pipelines(Kubeflow Pipelines)のように、かっこいいUIでワークフロー表示したりメタデータや実験結果を管理したり...といった機能はありませんが、とりあえず前処理して学習してデプロイ、という流れの自動化にさくっと使えそうです。

まとめ

以上、中井さんが公開されたサンプルコードを見ながら、Google CloudでつくるサーバレスMLOps環境について解説しました。Google Cloudの魅力はなんといってもgVisorに基づくさまざまなサーバレス環境にあるので、今後はこうしたサーバレスなML開発&運用環境がどんどん増えてくるものと期待しています。


Disclaimer この記事は個人的なものです。ここで述べられていることは私の個人的な意見に基づくものであり、私の雇用者には関係はありません。

80
59
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
80
59

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?