LoginSignup
1
0

More than 5 years have passed since last update.

Cloud Composer(composer-1.1.1-airflow-1.9.0)では、MLEngineのruntime_version, python_versionが指定できない

Last updated at Posted at 2018-09-28

問題

Cloud ComposerでMLEngineTrainingOperatorを使ってxgboostのトレーニングジョブを実行した際、以下のエラーがでたので調査しました。

import xgboost as xgb ImportError: No module named xgboost

ソースコード

import datetime

from airflow import models
from airflow.contrib.operators.mlengine_operator import (MLEngineTrainingOperator)

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())
now = datetime.datetime.now()

default_dag_args  = {
    'start_date': yesterday,
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'iris_train',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

    training_op = MLEngineTrainingOperator(
        project_id='durable-binder-547',
        job_id='test_training_' + now.strftime("%Y%m%d%H%M%S"),
        package_uris=['gs://hoge/iris_xgboost_trainer-0.0.0.tar.gz'],
        training_python_module='iris_xgboost_trainer.iris',
        region='us-central1',
        scale_tier='BASIC',
        task_id='test-training',
        runtime_version='1.4',
        python_version='2.7'
        training_args='')

ソースコード中の、iris_xgboost_trainer-0.0.0.tar.gz はxgboostを使ったpythonのトレーニング用パッケージです。

runtime_version='1.4'を指定しているので、xgboostが使えるはずです。
(参考:ランタイムバージョンと内包するパッケージ)
https://cloud.google.com/ml-engine/docs/tensorflow/runtime-version-list?hl=ja

調査

Cloud Composerのログを見ると実行引数にruntime_version, python_versionが渡されていないことがわかりました。

Running task with arguments: --cluster={"master": ["127.0.0.1:2222"]} --task={"type": "master", "index": 0} --job={ "package_uris": ["gs://hoge/scikit_learn_job_dir/packages/iris_xgboost_trainer-0.0.0.tar.gz"], "python_module": "iris_xgboost_trainer.iris", "args": [""], "region": "us-central1", "run_on_raw_vm": true}

さらに、Airflow側のログを確認したところruntime_version, python_versionが読み込めていない旨のエラーが出ていました。

[2018-09-26 09:17:15,137] {base_task_runner.py:98} INFO - Subtask: /usr/local/lib/python2.7/site-packages/airflow/models.py:2159: PendingDeprecationWarning: Invalid arguments were passed to MLEngineTrainingOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:@-@{"task-id": "test-training", "execution-date": "2018-09-26T09:17:06.402134", "workflow": "iris_train"}
[2018-09-26 09:17:15,138] {base_task_runner.py:98} INFO - Subtask: *args: ()@-@{"task-id": "test-training", "execution-date": "2018-09-26T09:17:06.402134", "workflow": "iris_train"}
[2018-09-26 09:17:15,138] {base_task_runner.py:98} INFO - Subtask: **kwargs: {'python_version': '2.7', 'runtime_version': '1.8'}@-@{"task-id": "test-training", "execution-date": "2018-09-26T09:17:06.402134", "workflow": "iris_train"}

原因

以下の記事にも記載した通り、Cloud ComposerのAirflowバージョンが1.9のためでした。
https://qiita.com/issey-hirano/items/7067c24f084bc05940fa

# 1.10のMLEngineTrainingOperatorの引数
class MLEngineTrainingOperator(BaseOperator):
    def __init__(self,
                 project_id,
                 job_id,
                 package_uris,
                 training_python_module,
                 training_args,
                 region,
                 scale_tier=None,
                 runtime_version=None,
                 python_version=None,
                 job_dir=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 mode='PRODUCTION',
                 *args,
                 **kwargs):
# 1.9のMLEngineTrainingOperatorの引数
class MLEngineTrainingOperator(BaseOperator):
    def __init__(self,
                 project_id,
                 job_id,
                 package_uris,
                 training_python_module,
                 training_args,
                 region,
                 scale_tier=None,
                 gcp_conn_id='google_cloud_default',
                 delegate_to=None,
                 mode='PRODUCTION',
                 *args,
                 **kwargs):

この通り、1.9ではruntime_version、python_versionが指定できません。
そのため、APIのデフォルトである、runtime_version=1.0で実行されてしいます。

runtime_version=1.0はxgboostがパッケージされていないため、ImportError: No module named xgboost となるようです。

回避

airflow.operators.bash_operator.BashOperatorを使って、gcloud ml-engine jobs submit trainingコマンドを叩く力業で無事トレーニングに成功しました。

import datetime

from airflow import models
from airflow.operators import bash_operator

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())
now = datetime.datetime.now()

default_dag_args  = {
    'start_date': yesterday,
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'iris_train_bashoperator',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

    training_op = bash_operator.BashOperator(
        task_id='test_training',
        bash_command='gcloud ml-engine jobs submit training test_training_$(date +"%Y%m%d_%H%M%S") \
            --module-name iris_xgboost_trainer.iris \
            --staging-bucket gs://zozo-ml/ \
            --region us-central1 \
            --packages gs://hoge/iris_xgboost_trainer-0.0.0.tar.gz \
            --runtime-version 1.4 \
            --python-version 2.7 \
            --scale-tier BASIC'
        )

最後に

はやくAirflow1.10に対応してほしいですね。
Cloud Composerは利用者がまだ少なく、日本語記事もほとんどないため、参考になれば幸いです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0