Help us understand the problem. What is going on with this article?

AI Platform Trainingで機械学習モデルの学習バッチを作った話

この記事は、MicroAd Advent Calendar 2020の12日目の記事です。

はじめに

GCP上でバッチ処理を行うにはCloud Functions、Cloud Run、AppEngineなど色々と選択肢がありますが、これらにはタスク完了まで10~15分以内という時間制限があり、数時間・数日のオーダーである機械学習モデルのトレーニングには適していません。

一方でAI Platform Trainingは計算リソースが柔軟に変更でき、時間制限がなかったりと機械学習をするには最高の環境ですが、スケジューリング機能がないためバッチ処理には不向きです。

そこでこの記事ではCloud SchedulerとAI Platform Trainingを連携させ、機械学習モデルの学習処理を定期実行する方法をご紹介しようと思います。

Cloud Schedulerの設定

とりあえず AI Platform Training のジョブ作成リクエストを投げるスケジューラーを組んでみます。

設定が必要なのは基本4項目。

  • 頻度:cronで指定するバッチ処理の実行タイミング
  • JobId: AI Platform Trainingに作成するジョブの名前
  • masterType: 学習に使用するマシンタイプ
  • masterConfig: 学習に使用するDockerImage(詳しくはこちら

scheduler_sample

しかしこれだけではうまく動きません。
この設定でも一度はAI Platformのジョブを作成できるのですが、2回目の実行では次のようなエラーが出てしまいます。

{
  "error": {
    "code": 409,
    "message": "Field: job.job_id Error: A job with this id already exists.",
    "status": "ALREADY_EXISTS",
    "details": [
      {
        "@type": "type.googleapis.com/google.rpc.BadRequest",
        "fieldViolations": [
          {
            "field": "job.job_id",
            "description": "A job with this id already exists."
          }
        ]
      }
    ]
  }

どうやらAI Platform TrainingのジョブIDは一意である必要があり、タスク実行毎に変更しなければいけないようです。

ですがCloud Scheduler単体ではリクエストを動的に生成する機能はないので、Cloud SchedulerだけでAI Platformをスケジューリングするのは無理なようです。

しょうがないので間にCloud Functionを経由してジョブIDにタイムスタンプを追加することで無理やりこの問題を解決しました。

実際に作ったCloud Functionはこんな感じです。Cloud Schedulerから送られてきたリクエストにタイムスタンプを追加するシンプルな構成になっています。

import time
import json
import base64
from googleapiclient import discovery, errors


def create_job(event, context):
    """
    Triggered from a message on a Cloud Pub/Sub topic.

    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    project_id = 'projects/test-project'
    ml = discovery.build('ml', 'v1')

    payload = json.loads(base64.b64decode(event['data']).decode('utf-8'))
    print(payload)

    timestamp = str(int(time.time()))
    payload['jobId'] += '_' + timestamp
    request = ml.projects().jobs().create(parent=project_id, body=payload)

    try:
        response = request.execute()
        print(response)
    except errors.HttpError as error:
        print(error._get_reason())

スケジューラーの方もHTTPリクエストを投げるのではなくPub/SubでCloud Functionを呼び出すよう合わせて修正。

scheduler_sample
新しいスケジューラを動かすと、、、

スクリーンショット 2020-12-11 18.43.03.png

Cloud Function経由でジョブが作成されているのを確認できました。
期待どうりタイムスタンプが付けられているので、ジョブIDが重複せず何度でもジョブを作成できます。

ちなみに

今回利用した AI Platform Training のブラッシュアップ版である AI Platform(統合型)がリリースされました。まだプレビュー段階なのでリージョンで日本を選択できなかったりと制限が多く業務利用は厳しかったりもしますが、ディスク容量やOSなど計算リソースをより細かく指定できるようになっているようです。今後のアップデートに期待大ですね。

microad
データとテクノロジーをかけ合わせたマーケティングプラットフォームを提供する会社です。
https://www.microad.co.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away