Overview.
GCPのCloud ComposerでAirflow環境を立ち上げAutoMLOperatorを使ってBigQueryからテーブルデータを抽出して、予測するフローを組んでみました。
普通にやれば行けるかと思ったら結構躓きました。
躓いたポイント
- AutoMLOperatorはAirflow1.10.6からだが、CloudComposerのAirflowイメージは1.10.3までしか対応していない
- AutoMLOperatorのリファレンスがスカスカすぎてgoogle-cloud-automlのライブラリのソースまで漁りました
どうやって解決したか
- 依存関係のあるパッケージをCloud Composerの環境にインストール
- AirflowのパッケージはCloud Composer環境にインストールできなかったので、ソースをとってきてpluginとして使った
前提
- GCPプロジェクト課金済み(支払いアカウント紐づき済み)
- モデルの学習・デプロイには課金が発生します
- AutoMLモデルデプロイ済み
- 適当なデータセットで適当なモデルを作成してデプロイしておきます
- 予測に用いるテストデータをBigQueryテーブルにセットしておきます
Cloud ComposerでAirflow環境を構築する
環境設定
基本設定は以下のようにしました。
- ゾーン: us-central1-c
- Google API scopes: https://www.googleapis.com/auth/cloud-platform
- Image version: composer-1.8.3-airflow-1.10.3
- Python version: 3
- ネットワーク タグ: なし
- Node count: 3
- Disk size (GB): 100
- マシンタイプ: n1-standard-1
ここからがAirflow1.10.3未対応のAutoMLOperatorに対応するためのちょっとした工夫。
CloudComposerのコンソールのPyPl PACKAGESタブで、cached_property
とgoogle-cloud-automl >=0.9.0
パッケージを追加しておく。(この環境の変更には20~30分くらいかかる)
Airflow1.10.3ではデフォルトのAirflowパッケージの中にgcp
モジュールがないのでAirflow1.10.6のGitHubリポジトリからAutoMLのOperatorだけ抽出してpluginとして置いてつかう。
リポジトリをcloneします。その中のAutoMLOperatorと依存関係のあるファイル
airflow/airflow/gcp/operators/automl.py
airflow/airflow/gcp/hooks/automl.py
airflow/airflow/gcp/hooks/base.py
だけ抽出します。
airflow
┗ gcp
┣ operators
┃ ┗ automl.py
┗ hooks
┣ automl.py
┗ base.py
という感じにまとめておきます。
これをairflowフォルダごとCloudComposer環境用のGCSバケットのpluginsフォルダに置いておく。
DAGを準備
とりあえずBigQueryからCloud Storageにテストデータセットを抽出してAutoMLのpredictにかけ、結果をGCSに吐き出すところまでをワークフロー化してみます。
データフローとしては、
BigQuery -> Cloud Storage -> AutoML -> Cloud Storage
という感じ。
(AutoMLデータセット作成&学習&デプロイ部分は時間がなかったので割愛)
# coding: utf-8
from datetime import datetime, timedelta
from os import path
import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
# from plugins/
from airflow.gcp.operators.automl import AutoMLBatchPredictOperator
#set paths
gcp_project = 'YOUR_PROJECT_NAME'
gcs_bucket = 'YOUR_BUCKET'
bq_dataset = 'YOUR_DATASET_NAME'
bq_table = 'YOUR_TABLE_NAME'
gcs_prefix = path.join('gs://', gcs_bucket, '')
gcp_location = 'us-central1'
# set AutoML model ID
model_id = 'TBL386XXXXXXXXXXX12' # AutoML model ID
# test dataset: `YOUR_PROJECT_NAME:YOUR_DATASET_NAME.YOUR_TABLE_NAME`
dataset_table = f'{gcp_project}:{bq_dataset}.{bq_table}'
# gcs path
storage_uri_path = 'automl_data/input'
# set default conn id
gcp_conn_id = 'google_cloud_default'
default_args = {
'owner': 'OWNER_NAME',
'retries': 1,
'retry_delay': timedelta(seconds=10),
'start_date': airflow.utils.dates.days_ago(1)
}
with DAG(
dag_id='automl_predict',
default_args=default_args,
schedule_interval=None
) as dag:
# extract dataset from bq to gcs
bq_to_gcs_op = BigQueryToCloudStorageOperator(
task_id='bq_to_gcs',
bigquery_conn_id=gcp_conn_id,
source_project_dataset_table=dataset_table,
destination_cloud_storage_uris=[path.join(gcs_prefix, storage_uri_path, 'test.csv')],
dag=dag
)
# set input data path
input_data = f'gs://{gcs_bucket}/{storage_uri_path}/test.csv'
input_config = {'gcs_source': {'input_uris': [input_data]}}
# set output data path prefix
output_data = f'gs://{gcs_bucket}/{storage_uri_path}/'.replace('input', 'output')
output_config = {'gcs_destination': {'output_uri_prefix': outputdata}}
# AutoML predict
automl_predict_op = AutoMLBatchPredictOperator(
task_id='predict',
model_id=model_id,
input_config=input_config,
output_config=output_config,
location=gcp_location,
project_id=gcp_project,
dag=dag
)
bq_to_gcs_op >> automl_predict_op
if __name__ == '__main__':
dag.cli()
全体はこんな感じ。
これでDAGをトリガーすればGCSのgs://YOUR_BUCKET/automl_dataset/output/
に予測結果がCSVで吐き出される。