6
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

WorkflowAdvent Calendar 2019

Day 21

Cloud Composer(Airflow)でAutoMLを使った推論パイプラインを組んでみた

Last updated at Posted at 2019-12-21

Overview.

GCPのCloud ComposerでAirflow環境を立ち上げAutoMLOperatorを使ってBigQueryからテーブルデータを抽出して、予測するフローを組んでみました。
普通にやれば行けるかと思ったら結構躓きました。

躓いたポイント

  • AutoMLOperatorはAirflow1.10.6からだが、CloudComposerのAirflowイメージは1.10.3までしか対応していない
  • AutoMLOperatorのリファレンスがスカスカすぎてgoogle-cloud-automlのライブラリのソースまで漁りました

どうやって解決したか

  • 依存関係のあるパッケージをCloud Composerの環境にインストール
  • AirflowのパッケージはCloud Composer環境にインストールできなかったので、ソースをとってきてpluginとして使った

前提

  • GCPプロジェクト課金済み(支払いアカウント紐づき済み)
    • モデルの学習・デプロイには課金が発生します
  • AutoMLモデルデプロイ済み
    • 適当なデータセットで適当なモデルを作成してデプロイしておきます
    • 予測に用いるテストデータをBigQueryテーブルにセットしておきます

Cloud ComposerでAirflow環境を構築する

環境設定

基本設定は以下のようにしました。

  • ゾーン: us-central1-c
  • Google API scopes: https://www.googleapis.com/auth/cloud-platform
  • Image version: composer-1.8.3-airflow-1.10.3
  • Python version: 3
  • ネットワーク タグ: なし
  • Node count: 3
  • Disk size (GB): 100
  • マシンタイプ: n1-standard-1

ここからがAirflow1.10.3未対応のAutoMLOperatorに対応するためのちょっとした工夫。
CloudComposerのコンソールのPyPl PACKAGESタブで、cached_propertygoogle-cloud-automl >=0.9.0パッケージを追加しておく。(この環境の変更には20~30分くらいかかる)
cap1.PNG

Airflow1.10.3ではデフォルトのAirflowパッケージの中にgcpモジュールがないのでAirflow1.10.6のGitHubリポジトリからAutoMLのOperatorだけ抽出してpluginとして置いてつかう。
リポジトリをcloneします。その中のAutoMLOperatorと依存関係のあるファイル

  • airflow/airflow/gcp/operators/automl.py
  • airflow/airflow/gcp/hooks/automl.py
  • airflow/airflow/gcp/hooks/base.py

だけ抽出します。

airflow
 ┗ gcp
   ┣ operators
   ┃  ┗ automl.py
   ┗ hooks
      ┣ automl.py
      ┗ base.py

という感じにまとめておきます。
これをairflowフォルダごとCloudComposer環境用のGCSバケットのpluginsフォルダに置いておく。

DAGを準備

とりあえずBigQueryからCloud Storageにテストデータセットを抽出してAutoMLのpredictにかけ、結果をGCSに吐き出すところまでをワークフロー化してみます。
データフローとしては、
BigQuery -> Cloud Storage -> AutoML -> Cloud Storage
という感じ。
(AutoMLデータセット作成&学習&デプロイ部分は時間がなかったので割愛)

automl_pipeline.py
# coding: utf-8

from datetime import datetime, timedelta
from os import path 

import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
# from plugins/
from airflow.gcp.operators.automl import AutoMLBatchPredictOperator
#set paths
gcp_project = 'YOUR_PROJECT_NAME'
gcs_bucket = 'YOUR_BUCKET'
bq_dataset = 'YOUR_DATASET_NAME'
bq_table = 'YOUR_TABLE_NAME'
gcs_prefix = path.join('gs://', gcs_bucket, '')
gcp_location = 'us-central1'

# set AutoML model ID
model_id = 'TBL386XXXXXXXXXXX12' # AutoML model ID
# test dataset: `YOUR_PROJECT_NAME:YOUR_DATASET_NAME.YOUR_TABLE_NAME`
dataset_table = f'{gcp_project}:{bq_dataset}.{bq_table}'
# gcs path 
storage_uri_path = 'automl_data/input'
# set default conn id
gcp_conn_id = 'google_cloud_default'

default_args = {
    'owner': 'OWNER_NAME',
    'retries': 1,
    'retry_delay': timedelta(seconds=10),
    'start_date': airflow.utils.dates.days_ago(1)
    }

with DAG(
    dag_id='automl_predict',
    default_args=default_args,
    schedule_interval=None
    ) as dag:

    # extract dataset from bq to gcs
    bq_to_gcs_op = BigQueryToCloudStorageOperator(
        task_id='bq_to_gcs',
        bigquery_conn_id=gcp_conn_id,
        source_project_dataset_table=dataset_table,
        destination_cloud_storage_uris=[path.join(gcs_prefix, storage_uri_path, 'test.csv')],
        dag=dag
        )

    # set input data path
    input_data = f'gs://{gcs_bucket}/{storage_uri_path}/test.csv'
    input_config = {'gcs_source': {'input_uris': [input_data]}}
    # set output data path prefix
    output_data = f'gs://{gcs_bucket}/{storage_uri_path}/'.replace('input', 'output')
    output_config = {'gcs_destination': {'output_uri_prefix': outputdata}}

    # AutoML predict
    automl_predict_op = AutoMLBatchPredictOperator(
        task_id='predict',
        model_id=model_id,
        input_config=input_config,
        output_config=output_config,
        location=gcp_location,
        project_id=gcp_project,
        dag=dag
        )

    bq_to_gcs_op >> automl_predict_op

if __name__ == '__main__':
    dag.cli()

全体はこんな感じ。

これでDAGをトリガーすればGCSのgs://YOUR_BUCKET/automl_dataset/output/に予測結果がCSVで吐き出される。

6
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?