問題
Cloud ComposerでMLEngineTrainingOperatorを使ってxgboostのトレーニングジョブを実行した際、以下のエラーがでたので調査しました。
import xgboost as xgb ImportError: No module named xgboost
ソースコード
import datetime
from airflow import models
from airflow.contrib.operators.mlengine_operator import (MLEngineTrainingOperator)
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
now = datetime.datetime.now()
default_dag_args = {
'start_date': yesterday,
'email': models.Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
with models.DAG(
'iris_train',
schedule_interval=datetime.timedelta(weeks=4),
default_args=default_dag_args) as dag:
training_op = MLEngineTrainingOperator(
project_id='durable-binder-547',
job_id='test_training_' + now.strftime("%Y%m%d%H%M%S"),
package_uris=['gs://hoge/iris_xgboost_trainer-0.0.0.tar.gz'],
training_python_module='iris_xgboost_trainer.iris',
region='us-central1',
scale_tier='BASIC',
task_id='test-training',
runtime_version='1.4',
python_version='2.7'
training_args='')
ソースコード中の、iris_xgboost_trainer-0.0.0.tar.gz
はxgboostを使ったpythonのトレーニング用パッケージです。
runtime_version='1.4'
を指定しているので、xgboostが使えるはずです。
(参考:ランタイムバージョンと内包するパッケージ)
https://cloud.google.com/ml-engine/docs/tensorflow/runtime-version-list?hl=ja
調査
Cloud Composerのログを見ると実行引数にruntime_version, python_versionが渡されていないことがわかりました。
Running task with arguments: --cluster={"master": ["127.0.0.1:2222"]} --task={"type": "master", "index": 0} --job={ "package_uris": ["gs://hoge/scikit_learn_job_dir/packages/iris_xgboost_trainer-0.0.0.tar.gz"], "python_module": "iris_xgboost_trainer.iris", "args": [""], "region": "us-central1", "run_on_raw_vm": true}
さらに、Airflow側のログを確認したところruntime_version, python_versionが読み込めていない旨のエラーが出ていました。
[2018-09-26 09:17:15,137] {base_task_runner.py:98} INFO - Subtask: /usr/local/lib/python2.7/site-packages/airflow/models.py:2159: PendingDeprecationWarning: Invalid arguments were passed to MLEngineTrainingOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:@-@{"task-id": "test-training", "execution-date": "2018-09-26T09:17:06.402134", "workflow": "iris_train"}
[2018-09-26 09:17:15,138] {base_task_runner.py:98} INFO - Subtask: *args: ()@-@{"task-id": "test-training", "execution-date": "2018-09-26T09:17:06.402134", "workflow": "iris_train"}
[2018-09-26 09:17:15,138] {base_task_runner.py:98} INFO - Subtask: **kwargs: {'python_version': '2.7', 'runtime_version': '1.8'}@-@{"task-id": "test-training", "execution-date": "2018-09-26T09:17:06.402134", "workflow": "iris_train"}
原因
以下の記事にも記載した通り、Cloud ComposerのAirflowバージョンが1.9のためでした。
https://qiita.com/issey-hirano/items/7067c24f084bc05940fa
# 1.10のMLEngineTrainingOperatorの引数
class MLEngineTrainingOperator(BaseOperator):
def __init__(self,
project_id,
job_id,
package_uris,
training_python_module,
training_args,
region,
scale_tier=None,
runtime_version=None,
python_version=None,
job_dir=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
mode='PRODUCTION',
*args,
**kwargs):
# 1.9のMLEngineTrainingOperatorの引数
class MLEngineTrainingOperator(BaseOperator):
def __init__(self,
project_id,
job_id,
package_uris,
training_python_module,
training_args,
region,
scale_tier=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
mode='PRODUCTION',
*args,
**kwargs):
この通り、1.9ではruntime_version、python_versionが指定できません。
そのため、APIのデフォルトである、runtime_version=1.0で実行されてしいます。
runtime_version=1.0はxgboostがパッケージされていないため、ImportError: No module named xgboost
となるようです。
回避
airflow.operators.bash_operator.BashOperator
を使って、gcloud ml-engine jobs submit training
コマンドを叩く力業で無事トレーニングに成功しました。
import datetime
from airflow import models
from airflow.operators import bash_operator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
now = datetime.datetime.now()
default_dag_args = {
'start_date': yesterday,
'email': models.Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
with models.DAG(
'iris_train_bashoperator',
schedule_interval=datetime.timedelta(weeks=4),
default_args=default_dag_args) as dag:
training_op = bash_operator.BashOperator(
task_id='test_training',
bash_command='gcloud ml-engine jobs submit training test_training_$(date +"%Y%m%d_%H%M%S") \
--module-name iris_xgboost_trainer.iris \
--staging-bucket gs://zozo-ml/ \
--region us-central1 \
--packages gs://hoge/iris_xgboost_trainer-0.0.0.tar.gz \
--runtime-version 1.4 \
--python-version 2.7 \
--scale-tier BASIC'
)
最後に
はやくAirflow1.10に対応してほしいですね。
Cloud Composerは利用者がまだ少なく、日本語記事もほとんどないため、参考になれば幸いです。