LoginSignup
16
15

More than 1 year has passed since last update.

【GCP】Kubeflow Pipelinesを使った機械学習パイプラインの自動化

Last updated at Posted at 2021-08-01

はじめに

 Qiitaへの初投稿です。半年間、独学で機械学習やGCPについて学習したアウトプットとして記事を書きました。間違いなどございましたら、ご指摘いただけると幸いです。随時、加筆修正いたします。今回使用するコードはこちらのレポジトリにあります。

 この記事では、Kubeflow Pipelinesを用いた機械学習パイプラインの自動化について記述します。Courseraの講座をベースにしました(当該GitHubレポジトリはこちら)。設計にはこちらのドキュメントが参考になります。

目的

 パイプラインの実行を自動化することにより、機械学習モデルの運用コストを下げることを目的としています。具体的には、コードの変更後に、モデルの訓練・評価・デプロイなどのタスクを自動で実行されます。また、コンポーネントの追加が容易であるようなパイプラインを構築することで、段階的な機械学習モデルの導入を実現することができると考えられます。今回は、この目的を実現するために、Kubeflow PipelinesGoogle Cloud Platformの各種サービスを利用します。

概要

Architecture

icon.png

 今回は、24時間分の気象データから、翌24時間分の気温の予測を作成するシステムを作ります。前準備として、気象庁から過去の気象データをスクレイピングし、BigQueryにアップロードします。パイプラインの構成には、Kubeflow pipelinesを用います。

 ここから、パイプラインのコンポーネントを説明します。まず、Dataflowを用いて、データの分割・前処理・保存を行います。データの前処理に関しては、TensorFlow Transformを利用します。BigQueryのレコードを時系列に沿って分割抽出し、train, valid, testの3つのデータセットを作成します。train, validセットについては前処理を施し、それぞれのデータセットをCSVファイルに書き込んでCloud Storageへと保存します。

 ここからは、AI Platform上でTensorFlowを使用します。まず、tf.dataでデータセットに時系列データ解析のためのwindow処理を行います。そして、tf.kerasとTensoFlow Addonsでseq2seqモデルを作成します。window処理したデータでこのモデルを訓練します。ハイパーパラメーターチューニングをした後に、再訓練を行い、モデルをTensorFlow Transformの前処理関数と併せて、Cloud Storageへと保存します。さらに、保存したモデルをロードし、テストセットを使って評価します。評価指標が事前設定した閾値より良ければデプロイします。デプロイ時には、カスタム予測ルーチンの設定をします。以上がパイプラインの概要です。

 最後に、上記のパイプラインを実行するためのビルド構成ファイルを準備し、Cloud Buildを利用して、「GitHubのリモートレポジトリへの新しいtagのpush」などをトリガーとして、ビルド構成ファイルを自動実行するように設定します。1日1回のスクレイピングやオンライン予測の作成を実行プログラムをCloud FunctionsCloud Schedulerを用いて実装し、システム全体の完成です。

Requirements

  • Python 3.7
  • TensorFlow 2.5.0
  • TensorFlow Tansform 1.1.0
  • TensorFlow Addons 0.13.0
  • Kubeflow Pipelines
  • GCP
    • AI Platform
    • Cloud Storage
    • BigQuery
    • Dataflow
    • Cloud Build
    • Cloud Functions
    • Cloud Scheduler

ディレクトリ構造

今回使用するディレクトリの構成は次のようになっています。
スクリーンショット 2021-08-01 16.03.26.png

下準備

データセットの作成

使用ファイル
- weather_scrayper.py

 まず始めに、今回使用するデータセットの準備をします。気象庁の過去の気象データ検索からスクレイピングし、BigQueryのテーブルにアップロードします。取得するデータは、東京都の1時間ごとの値で、「時、気圧(現地、海面)、降水量、気温、湿度、風向、風速、日照時間、全天日射量」の列を使用します。本サイトでは、毎日午前2時頃に前日分のデータが更新されます。

スクリーンショット 2021-06-20 13.09.59(2).png

 BeautifulSoupを用いてWebスクレイピングを行います。こちらの記事を参考にしました。ここで、BigQueryのテーブルへの書き込みを検討しましたが、大量の書き込みは不正なリクエストとして検知されてしまいます。そのため、今回はCSVファイルに書き込んでからBigQueryへとアップロードします。取得するデータの期間は、2011年1月1日の午前1時〜プログラム実行日の午前0時です。
 こちらのプログラムを実行後、CSVファイルが作成されたら、BigQueryへとアップロードします。この際、スキーマは自動取得ではなく、以下の様に設定します。詳細設定から、ヘッダーの1行分をスキップします。テーブル名はtokyoとしました。

スクリーンショット 2021-07-29 22.15.13.png

 アップロードジョブが終了したら、レコードの確認をします。日照時間・全天日射量の欠損値は0.0で埋めています。この時点で、特徴量の数は10個となっています。

スクリーンショット 2021-07-29 22.17.49.png

GKEクラスターの作成

 次の手順で、GKEクラスターの作成とデプロイをします。クラスターの作成後、画面最下部よりデプロイします。最後に、こちらを参考に、Kuberneteシークレットを使用して、クラスタにGoogle CloudリソースとAPIへのアクセスを許可します。比較的時間のかかるトレーニングジョブを実行した際に、BrokenPipeLineが出てしまうので、それを防ぐためにこの設定をします。エラーの詳細は、こちらのissueが参考になります。

スクリーンショット 2021-06-24 15.20.42.png

スクリーンショット 2021-06-24 15.21.24.png

スクリーンショット 2021-08-01 11.19.52.png

Kubeflow Pipelines

使用ファイル:
- pipeline/model_training_pipeline.py

 今回パイプラインの作成に使用するKubeflow Pipelineについては、こちらのドキュメントが参考になります。

Kubeflow Pipelines は、以下で構成されます。
・一連のコンテナ化された ML タスク、またはコンポーネント。パイプライン コンポーネントは、Docker イメージとしてパッケージ化された自己完結型のコードです。コンポーネントは、入力引数を受け取り、出力ファイルを生成し、パイプラインで 1 つのステップを実行します。
・Python ドメイン固有の言語(DSL)で定義された ML タスクのシーケンスの仕様。ワークフローのトポロジは、上流ステップの出力を下流ステップの入力に接続することによって暗黙的に定義されます。パイプライン定義のステップは、パイプライン内のコンポーネントを呼び出します。複雑なパイプラインでのコンポーネントは、ループで複数回実行される場合や、条件付きで実行される場合があります。
・パイプライン入力パラメータのセット。値はパイプラインのコンポーネントに渡されます。データのフィルタリング条件や、パイプラインが生成するアーティファクトの保存場所が含まれます。

 パイプライン全体はこちらのファイルをご覧ください。今回は、事前定義されたGoogle Cloudコンポーネント軽量のpythonコンポーネントを使用します。事前定義されたGoogle Cloudコンポーネントについては、上記の記事で次のように説明されています。

事前定義された Google Cloud コンポーネントの使用: Kubeflow Pipelines は、必須パラメータを指定することで、Google Cloud でさまざまなマネージド サービスを実行する事前定義されたコンポーネントを提供します。これらのコンポーネントは、BigQuery、Dataflow、Dataproc、AI Platform などのサービスを使用したタスクの実行に役立ちます。これらの事前定義された Google Cloud コンポーネントは、AI Hub でも利用できます。再利用可能なコンポーネントを使用する場合と同様に、これらの component op は、ComponentStore.load_components を通じて事前定義されたコンポーネント仕様から自動的に作成されます。その他の事前定義されたコンポーネントは、Kubeflow などのプラットフォームでジョブを実行するために使用できます。

 GCPから利用できる事前定義されたコンポーネントは、こちらから確認できます。今回使用するのは、AI platform上でのトレーニングジョブとトレーニング済みモデルのデプロイを行うコンポーネントです。load_conponents関数で指定するのは、次のような、conponents.yamlを含むディレクトリです。COMPONENT_URL_SEARCH_PREFIXに続くpathを指定しましょう。同時に、READEME.mdからコンポーネントの必須パラメーターや実行後の出力について確認できます。(参照:https://github.com/kubeflow/pipelines/tree/master/components/gcp/ml_engine/train)

model_training_pipeline.py
import kfp

# COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.6.0/components/gcp/'
COMPONENT_URL_SEARCH_PREFIX = os.getenv('COMPONENT_URL_SEARCH_PREFIX')

# Create component factories        
component_store = kfp.components.ComponentStore(
    local_search_paths=None, url_search_prefixes=[COMPONENT_URL_SEARCH_PREFIX])

# Predefined components
# component.yamlがあるディレクトリを指定
mlengine_train_op = component_store.load_component('ml_engine/train')
mlengine_deploy_op = component_store.load_component('ml_engine/deploy')

 次に、軽量のPythonコンポーネントについてです。

軽量の Python コンポーネントの実装: このコンポーネントでは、コードを変更するたびに新しいコンテナ イメージを作成する必要はなく、ノートブック環境での迅速なイテレーションを対象としています。kfp.components.func_to_container_op 関数を使用して、Python 関数から軽量コンポーネントを作成できます。

今回は以下のように、func_to_container_op関数によって3つのコンポーネントを作成しています。

model_training_pipeline.py
from helper_components import retrieve_best_run
from helper_components import evaluate_model
from preprocess_dataflow_pipeline import run_transformation_pipeline

import kfp
from kfp.components import func_to_container_op

TRANSFORM_BASE_IMAGE = os.getenv('TRANSFORM_BASE_IMAGE') # docker_images/transform_image/Dockerfile
BASE_IMAGE = os.getenv('BASE_IMAGE') # docker_images/base_image/Dockerfile
EVALUATE_IMAGE = os.getenv('EVALUATE_IMAGE') # docker_images/evaluate_image/Dockerfile

# Lightweight components
run_transform_pipeline_op = func_to_container_op(
    run_transformation_pipeline, base_image=TRANSFORM_IMAGE)
retrieve_best_run_op = func_to_container_op(
    retrieve_best_run, base_image=BASE_IMAGE)
evaluate_model_op = func_to_container_op(
    evaluate_model, base_image=EVALUATE_IMAGE)

 それでは、ここからそれぞれのコンポーネントに必要なファイルを記述します。今回は、事前定義されたGoogle Cloudコンポーネントを2つ、軽量なPythonコンポーネントを3つ作成したので、以下では5つのステップに分けて説明します。最後に、5つのコンポーネントからパイプラインの作成をします。

データ抽出〜保存

dataflow.png

使用ファイル:
- pipeline/preprocess_dataflow_pipeline.py

 1番目のステップです。ここでは、データセットの作成と変換を行うrun_transform_pipeline_opで使用するファイルを作成し、Dataflowを用いて以下のフローを実行します。

  • BigQueryからのデータ抽出
  • Tensorflow Transformでの前処理
  • Cloud Storageへの保存

 それぞれの処理をApache Beamのプログラミングモデルを用いて記述し、最後に一連の処理を行うApache Beamパイプラインを作成します。このステップで実行されるrun_transformation_pipeline関数は、こちらから確認できます。

データ抽出

datetime.png

 まず、BigQueryのテーブルからデータの抽出を行い、Train,Valid,Testの3つのデータセットを作成します。後述しますが、1日1回のスクレイピングでテーブルにレコードを自動追加されます。そのため、プログラムの実行タイミングによって、3つのデータセットにおけるデータ量の比率が変わらないよう、動的に変化するクエリを記述します。ここではUNIX時間を使って、最初のレコードのDate(2011-01-01 01:00:00)からプログラム実行までの経過時間を80:10:10の割合で分割し、テンプレートクエリ中のstartendの値を定めます。

preprocess_dataflow_pipeline.py
# Generating the query
# train, valid, testセットのデータ量の比率が常に一定であるように、UNIX時間を使ってクエリを変化させる
def generate_sampling_query(source_table_name, step):
    # Setting timestamp division
    start = datetime(2011, 1, 1, 1, 0, 0)
    end = datetime.now()
    diff = end.timestamp() - start.timestamp()

    train_start = start.timestamp()
    train_end = train_start + diff * 0.8
    valid_end = train_end + diff * 0.1
    test_end = valid_end + diff * 0.1

    train_start = datetime.fromtimestamp(train_start)
    train_end = datetime.fromtimestamp(train_end)
    valid_end = datetime.fromtimestamp(valid_end)
    test_end = datetime.fromtimestamp(test_end)

    valid_start = train_end
    test_start = valid_end

    # Template query
    sampling_query_template="""

SELECT
    *
FROM 
    `{{source_table}}`
WHERE
    Date BETWEEN '{{start}}' AND '{{end}}'
ORDER BY 
    Date

    """

    # Changing query dependging on steps
    if step == "Train":
        start, end = train_start, train_end
    elif step == "Valid":
        start, end = valid_start, valid_end
    else:
        start, end = test_start, test_end

    query = Template(sampling_query_template).render(
        source_table=source_table_name, start=start, end=end)

    return query

 次に、読み込み前のデータ形式の変換を行います。まず、Datetime形式のデータは読み取れないので、Date列をtimestamp形式に変換します。また後の処理のために、window_direction列を文字列から、360°表記に変換します。後述するTensorFlow Transformの前処理関数で処理を完結させようとしましたが、DataFrame→ndarray→Tensorという形式変換の際にエラーが出てしまったので、prep_bq_row関数で読み込み前の処理を行いました。

preprocess_dataflow_pipeline.py
def prep_bq_row(bq_row):

    result = {}

    for feature_name in bq_row.keys():
        result[feature_name] = bq_row[feature_name]

    date_time = pd.to_datetime(bq_row["Date"])
    time_stamp = pd.Timestamp(date_time)
    result["Date"] = time_stamp.timestamp()

    wind_direction = tf.strings.regex_replace(bq_row["wind_direction"], "[\s+)]", "")
    wind_direction = tf.strings.regex_replace(wind_direction, "[x]", u"静穏")  

    direction_list = [
        "北", "北北東", "北東", "東北東", "東", "東南東", "南東", "南南東", 
        "南", "南南西", "南西", "西南西", "西", "西北西", "北西", "北北西", "静穏"
    ]
    degree_list = [
        0.0, 22.5, 45.0, 67.5, 90.0, 112.5, 135.0, 157.5,
        180.0, 202.5, 225.0, 247.5, 270.0, 292.5, 315.0, 337.5, 0.0
    ]

    def direction_to_degree(direction):
        if direction in direction_list:
            index = direction_list.index(direction)
            return degree_list[index]
        else:
            return 0.0

    result["wind_direction"] = direction_to_degree(wind_direction)

    return result

 最後に、Apache BeamのI/Oコネクターを用いて、BigQueryからデータの読み込みを行います。

preprocess_dataflow_pipeline.py
def read_from_bq(pipeline, source_table_name, step):

    query = generate_sampling_query(source_table_name, step)

    # Read data from Bigquery
    raw_data = (
        pipeline
        | 'Read{}DatafromBigQuery'.format(step) >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
        | 'Preproc{}Data'.format(step) >> beam.Map(prep_bq_row)
    )

    raw_dataset = (raw_data, raw_metadata)

    return raw_dataset

前処理

 ここからは、前項で抽出したデータにTensorFlow Transformを用いて前処理を加えます。

TensorFlow Transform

tf.Transform ライブラリは全走査を必要とする変換に役立ちます。tf.Transform の出力は、インスタンス レベルの変換ロジックだけでなく全走査変換から計算された統計値も表す TensorFlow グラフとしてエクスポートされ、トレーニングおよびサービス提供に利用されます。トレーニングとサービス提供の両方に同じグラフを使用すると、両方の段階で同じ変換が適用されるため、スキューを防止できます。また、tf.Transform は Dataflow のバッチ処理パイプラインで大規模に実行できるため、トレーニング データを事前に準備してトレーニングの効率を改善できます。

 Tensorflow Transformの変換には、AnalyzeフェーズとTransformフェーズとがあります。前者で、未加工のTrainデータを全走査し、変換に必要な数値を取得します。そして、後者でデータの変換を行います。Trainデータから計算された統計量(平均・分散、最小・最大値など)を用いてTrain、Validデータを変換します。また、同じ変換プロセスをサービング後の予測に用いることで、Training-Serving skewを防ぐことができます。

前処理関数の定義

それでは、前処理関数の定義を始めます。こちらのドキュメントを参考にしました。
 Date列は周期性を表現できるように、sin波・cos波を使って変換します。また、wind_directionとwind_velocity列を併せてベクトルへと変換します。また、air_pressure_ashoreとair_pressure_afloatの外れ値をクリップします。最後に、変数の正規化・標準化を行います。temperature列の平均・分散は、モデルの評価やサービング時の予測後のリスケーリングに使用するので、それぞれを1つの特徴量として一緒に書き込みます。(参照:https://github.com/tensorflow/transform/issues/185)

preprocess_dataflow_pipeline.py
    def preprocess_fn(inputs):    
        outputs = {}

        # Date
        timestamp_s = inputs["Date"]

        day = 24 * 60 * 60
        year = 365.2425 * day

        outputs["day_sin"] = tf.sin(timestamp_s * 2 * math.pi / day)
        outputs["day_cos"] = tf.cos(timestamp_s * 2 * math.pi / day)

        outputs["year_sin"] = tf.sin(timestamp_s * 2 * math.pi / year)
        outputs["year_cos"] = tf.cos(timestamp_s * 2 * math.pi / year)

        # Air pressure
        STANDARDIZED_FEATURES_LIST = ["air_pressure_ashore", "air_pressure_afloat"]
        for feature in STANDARDIZED_FEATURES_LIST:
            outputs[feature] = tft.scale_to_0_1(tf.clip_by_value(inputs[feature], 860.0, 1100.0)) # 外れ値のクリップ

        outputs["diff_air_pressure"] = outputs["air_pressure_ashore"] - outputs["air_pressure_afloat"] # 特徴量の作成 

        # Wind
        wind_direction_rad = inputs["wind_direction"] * math.pi / 180.0

        outputs["wind_vector_x"] = inputs["wind_velocity"] * tf.cos(wind_direction_rad)
        outputs["wind_vector_y"] = inputs["wind_velocity"] * tf.sin(wind_direction_rad)

        # Others
        # Normalizing numerical features
        NORMALIZED_FEATURES_LIST = ["precipitation", "temperature", "humidity", "hours_of_daylight", "global_solar_radiation"]
        for feature in NORMALIZED_FEATURES_LIST:
            outputs[feature] = tft.scale_to_z_score(inputs[feature])

        # Calcurating stats of Temperature and Converting to feature
        # preprocess_fn()で変換したデータに、trainセットのtemperature列の平均と分散が追加される
        def feature_from_scalar(value):
            batch_size = tf.shape(input=inputs["temperature"])[0]

            return tf.tile(tf.expand_dims(value, 0), multiples=[batch_size])

        outputs["temp_mean"] = feature_from_scalar(tft.mean(inputs['temperature']))
        outputs["temp_var"] = feature_from_scalar(tft.var(inputs['temperature']))

        return outputs

 上記の前処理関数をTrainセットに適用します。tft_beam.AnalyzeAndTransformDataset関数で、Analyze・Transformの両フェイズを一度に実行します。

preprocess_dataflow_pipeline.py
# Analyze and transform train dataset
def analyze_and_transform(raw_dataset, step):    

    transformed_dataset, transform_fn = (
        raw_dataset
        | tft_beam.AnalyzeAndTransformDataset(preprocess_fn)
    )

    return transformed_dataset, transform_fn

 続いて、Validデータに同様の前処理関数を用いて変換を行います。繰り返しになりますが、ここでの変換には、Trainセットから作成された統計量が使用されます。※Testセットには前処理を行いません。

preprocess_dataflow_pipeline.py
# Transform valid and test dataset
def transform(raw_dataset, transform_fn, step):    

    transformed_dataset = (
        (raw_dataset, transform_fn)
        | '{}Transform'.format(step) >> tft_beam.TransformDataset()
    )

    return transformed_dataset

保存

 最後に、データセットをカンマ区切りのCSV形式に変換し、Apache BeamのI/OコネクターでCloud Storageへと保存します。

preprocess_dataflow_pipeline.py
def to_train_csv(rawdata):   

    # 変換後の特徴量
    TRAIN_CSV_COLUMNS = [
        'day_sin', 'day_cos', 'year_sin', 'year_cos', 'air_pressure_ashore', 'air_pressure_afloat', 'diff_air_pressure',
        'precipitation', 'temperature', 'humidity', 'wind_vector_x', 'wind_vector_y',
        'hours_of_daylight', 'global_solar_radiation', 'temp_mean', 'temp_var'
    ]

    data = ','.join([str(rawdata[k]) for k in TRAIN_CSV_COLUMNS])

    yield str(data)

def to_test_csv(rawdata):

    # 未変換の特徴量
    TEST_CSV_COLUMNS = [
        'Date', 'air_pressure_ashore', 'air_pressure_afloat', 'precipitation', 'temperature',
        'humidity', 'wind_direction', 'wind_velocity', 'hours_of_daylight', 'global_solar_radiation'
    ]

    data = ','.join([str(rawdata[k]) for k in TEST_CSV_COLUMNS])

    yield str(data)

# Cloud storageへの書き込み
def write_csv(transformed_dataset, location, step):    
    transformed_data, _ = transformed_dataset
    (
    transformed_data
    | '{}Csv'.format(step) >> beam.FlatMap(to_csv)
    | '{}Out'.format(step) >> beam.io.Write(beam.io.WriteToText(location))
    )

 前処理関数と変換に必要な統計量を含んだ変換プロセスを保存します。

preprocess_dataflow_pipeline.py
# 変換プロセスの保存
def write_transform_artefacts(transform_fn, location):
    (
    transform_fn
    | 'WriteTransformArtefacts' >> tft_beam.WriteTransformFn(location)
    )

Dataflowジョブの実行

 以上で、データの分割・前処理・保存を行う関数の定義が終了しました。それでは、以上のプロセスを実行するApache Beamパイプラインを作成し、DataflowRunnerで実行します。
 Kubeflow pipelinesの事前定義されたGoogle Cloudコンポーネントを利用して、Dataflowのジョブを実行できますが、このコンポーネントは呼び出し時に、指定したpythonファイルを1つだけtmpディレクトリにダウンロードして実行するような設定になっているようです[参考]。そのため、Apache Beamパイプラインのoptionsでsetup_fileを指定したとしても、set_up.pyはtmpディレクトリへとダウンロードされず、Dataflowジョブの実行時に、tensorflow-transformについて、"module not found error"が起きてしまいます。そこで今回は、軽量のPythonコンポーネントを使用します。
 それでは上記の関数定義を使って、Dataflowジョブを実行するためのrun_transformation_pipeline関数の定義をしましょう。

preprocess_dataflow_pipeline.py
from typing import NamedTuple

def run_transformation_pipeline(
    source_table_name:str, job_name:str, gcs_root:str, project_id:str, region:str, dataset_location:str
) -> NamedTuple('Outputs', [('training_file_path', str), ('validation_file_path', str), ('testing_file_path', str)]):

    from datetime import datetime
    import os
    import tempfile
    import copy

    import tensorflow as tf    
    import tensorflow_transform as tft
    import tensorflow_transform.beam as tft_beam
    from tfx_bsl.public import tfxio

    import apache_beam as beam
    from jinja2 import Template

    #-------------------------------------------------------------------------------------

    # 上記の関数定義

    #-------------------------------------------------------------------------------------    

    TRAINING_FILE_PATH = 'training/data.csv'
    VALIDATION_FILE_PATH = 'validation/data.csv'
    TESTING_FILE_PATH = 'testing/data.csv'

    options = {
        'staging_location': os.path.join(gcs_root, 'tmp', 'staging'),
        'temp_location': os.path.join(gcs_root, 'tmp'),
        'job_name': job_name,
        'project': project_id,
        'max_num_workers': 3,
        'save_main_session': True,
        'region': region,
        'setup_file': './setup.py', # pipeline内でtensorflow-transformを使えるようにsetup_fileの設定をする
    }

    opts = beam.pipeline.PipelineOptions(flags=[], **options)

    RUNNER = 'DataflowRunner'

    with beam.Pipeline(RUNNER, options=opts) as pipeline:
        with tft_beam.Context(temp_dir=tempfile.mkdtemp()):

            # Create training set
            step = "Train"            
            training_file_path = '{}/{}'.format(dataset_location, TRAINING_FILE_PATH)
            tf_record_file_path = dataset_location

            raw_train_dataset = read_from_bq(pipeline, source_table_name, step)
            transformed_train_dataset, transform_fn = analyze_and_transform(raw_train_dataset, step)            
            write_csv(transformed_train_dataset, training_file_path, step)

            # Create validation set
            step = "Valid"            
            validation_file_path = '{}/{}'.format(dataset_location, VALIDATION_FILE_PATH)

            raw_eval_dataset = read_from_bq(pipeline, source_table_name, step)
            transformed_eval_dataset = transform(raw_eval_dataset, transform_fn, step)
            write_csv(transformed_eval_dataset, validation_file_path, step)

            # Create testing set
            step = "Test"
            testing_file_path = '{}/{}'.format(dataset_location, TESTING_FILE_PATH)

            raw_test_dataset = read_from_bq(pipeline, source_table_name, step)
            write_csv(raw_test_dataset, testing_file_path, step)

            # Sarving artefacts
            transform_artefacts_dir = os.path.join(gcs_root,'transform') 
            write_transform_artefacts(transform_fn, transform_artefacts_dir)

    return (training_file_path, validation_file_path, testing_file_path, transform_artefacts_dir)

 以上で、このステップは終了です。BigQueryのテーブルから、Cloud Storageに3つのCSVファイルが作成されました。

データセットのwindow処理~モデルの保存

model_training.png

 2番目のステップです。ここでは、モデルの訓練を行うmlengine_train_opで使用するファイルを作成し、以下のフローを実行します。

  • データセットのwindow処理
  • seq2seqモデルの作成
  • AI PlatformでのHyperparameter Tuningと再トレーニング
  • モデルの保存

 tf.dataを用いてデータセットのwindow処理を行い、TensorFlow Addonsを利用してseq2seqモデルを作成します。そして、AI Platformの訓練ジョブでモデルのHyperparameter Tuningを行い、最もパフォーマンスの良いハイパーパラメーターをセットして、再トレーニングを行います。訓練が終わったら、Cloud Storageへとモデルを保存します。

seq2seqモデル

seq2seq.png

 今回使用するモデルはseq2seqです。こちらの記事を参考にしました。このモデルを使って、24時間分の気象データの入力から、翌日の1時間ごとの気温の予測をします。 seq2seqモデルは、EncoderとDecoderからなるモデルです。Decoderの入力として、前時点での実現値(実際の気温)を使用する方法と、モデルの前時点での予測値を使用する方法とがあります。Teacher forcingでは、訓練時に前時点の実現値、予測時に前時点でのモデルの予測値をを使用します。これは、訓練の収束を速めるために有効ですが、訓練時と予測時でモデルの評価指標に大きな差が出る恐れがあります。今回は、前者の方法から後者の方法へと段階的に切り替えるScheduled Samplingを使用します。まずは、このサンプリング方法に併せてデータセットの処理を行いましょう。

window処理

window.png

使用ファイル:
- docker_images/trainer_image/create_dataset.py

 まず、train, validセットへのwindow処理を行うwindowed_dataset関数の定義をします。ここでは、こちらの記事を参考にしました。まず、48時点のデータを1まとめにして、前半の24時点をEncoderのインプットとします。後半の24時点はDecoderのインプットとアウトプットの作成に使います。訓練時のDecoderのインプットは、図のように最初の要素が0で、以降は前時点のDecoderアウトプットと同じ値を使います。*testセットへの処理は、モデル評価のステップで定義します。

docker_images/trainer_image/create_dataset.py
from functools import partial

import tensorflow as tf

# Setting defaults
CSV_COLUMNS = [
    'day_sin', 'day_cos', 'year_sin', 'year_cos', 'air_pressure_ashore', 'air_pressure_afloat', 'diff_air_pressure',
    'precipitation', 'temperature', 'humidity', 'wind_vector_x', "wind_vector_y", 
    'hours_of_daylight', 'global_solar_radiation', 'temp_mean', 'temp_var'
]

SELECT_COLUMNS = [
    'day_sin', 'day_cos', 'year_sin', 'year_cos', 'air_pressure_ashore', 'air_pressure_afloat', 'diff_air_pressure',
    'precipitation', 'temperature', 'humidity', 'wind_vector_x', "wind_vector_y", 'hours_of_daylight', 'global_solar_radiation'
]

DEFAULTS = [[0.0] for _ in range(len(SELECT_COLUMNS))]

# Packing features
def pack(features):
    packed_features =  tf.stack(list(features.values()), axis=1)

    return tf.reshape(packed_features, [-1])

@tf.function
def marshal(x, feature_keys):
    features = {
        k: x[:, feature_keys.index(k)] for k in feature_keys #pack時失われたkeyを付け直す

    }

    return features

# Window processing
def windowed_dataset(dataset, batch_size, mode):

    marshal_fn_partial = partial(marshal, feature_keys=SELECT_COLUMNS) 

    dataset = dataset.map(pack)
    dataset = dataset.window(size=48, shift=1, drop_remainder=True)
    dataset = dataset.flat_map(lambda window: window.batch(48))

    if mode == "train":
        dataset.shuffle(1000)

    encoder_input = dataset.map(lambda window: window[:24]).map(marshal_fn_partial)
    decoder_input = dataset.map(lambda window: tf.concat((tf.zeros((1)), window[24:-1, 8]), axis=0)) #Teacher Forcingのため、decoder_inputの先頭は、0にする
    decoder_output = dataset.map(lambda window: window[24:, 8])

    inputs = tf.data.Dataset.zip((encoder_input, decoder_input))
    dataset = tf.data.Dataset.zip((inputs, decoder_output)).cache()

    dataset = dataset.batch(batch_size, drop_remainder=True).repeat(1).prefetch(1)  

    return dataset

 tf.data.experimental.make_csv_dataset関数でロードしたデータに、window処理を施します。この際、select_columnsを設定して、temperature列の平均・分散であるtemp_mean/temp_varを省きます。

docker_images/trainer_image/create_dataset.py
# Loading dataset
def load_dataset(filename, batch_size, mode):

    dataset = tf.data.experimental.make_csv_dataset(
            file_pattern=filename,
            column_names=CSV_COLUMNS,
            column_defaults=DEFAULTS,
            select_columns=SELECT_COLUMNS,
            batch_size=1,
            shuffle=False,
            header=False,
            num_epochs=1)

    dataset = windowed_dataset(dataset, batch_size, mode)

    return dataset

 モデルの訓練に使用するデータセットへのwindow処理が終わりました。データの形状は、次の通りです。
- Encoder Input: (batch_size, 24, 14)
- Decoder Input: (batch_size, 24, 1)
- Decoder Output: (batch_size, 24, 1)
※Encoder Inputは、厳密には14のkeyを持ち格要素が(24,)の辞書となっています。

モデルの作成

使用ファイル:
- docker_images/trainer_image/create_model.py

 ここでは、訓練時のモデルを作成するtrain_model関数と、予測時のモデルを作成するpredict_model関数を、tf.kerasとTensorFlow Addonsを使って作成します。まず、訓練時に使用するモデルです。TensorFlow AddonsのScheduledOutputTrainingSamplerを利用してScheduled Smaplingを実装しています。後述しますが、optimizerのlearning_rateとDropout層のDropoutする割合(dropout_rate)はチューニングの対象となっています。

docker_images/trainer_image/create_model.py
import tensorflow as tf
import tensorflow_addons as tfa

# Creating model for training and evaluating
def train_model(num_units=128, learning_rate=0.001, dropout_rate=0.35):

    SELECT_COLUMNS = [
        'day_sin', 'day_cos', 'year_sin', 'year_cos', 'air_pressure_ashore', 'air_pressure_afloat', 'diff_air_pressure',
        'precipitation', 'temperature', 'humidity', 'wind_vector_x', "wind_vector_y", 'hours_of_daylight', 'global_solar_radiation'
    ]

    # Input layer
    # tf.keras.experimental.SequenceFeaturesによる入力層は、モデルの保存ができず断念
    encoder_input_layers = {
        colname: tf.keras.layers.Input(name=colname, shape=(24, 1), dtype=tf.float32)
        for colname in SELECT_COLUMNS
    }

    pre_model_input = tf.keras.layers.Concatenate(axis=-1, name="concatenate")(encoder_input_layers.values())

    # Encoder
    encoder_lstm = tf.keras.layers.LSTM(num_units, return_sequences=True, name="encoder_lstm1")(pre_model_input)
    encoder_dropout = tf.keras.layers.Dropout(dropout_rate, name="encoder_dropout")(encoder_lstm)
    encoder_output, state_h, state_c = tf.keras.layers.LSTM(num_units, return_state=True, name="encoder_lstm2")(encoder_dropout)
    encoder_state = [state_h, state_c]

    # Scheduled Sampler
    sampler = tfa.seq2seq.sampler.ScheduledOutputTrainingSampler(
        sampling_probability=0.,
        next_inputs_fn=lambda outputs: tf.reshape(outputs, shape=(1, 1))
    )
    sampler.sampling_probability = tf.Variable(0.)

    # Decoder
    decoder_input = tf.keras.layers.Input(shape=(24, 1), name="decoder_input")

    decoder_cell = tf.keras.layers.LSTMCell(num_units, name="decoder_lstm")
    output_layer = tf.keras.layers.Dense(1, name="decoder_output")

    decoder = tfa.seq2seq.basic_decoder.BasicDecoder(decoder_cell, sampler, output_layer=output_layer)
    decoder_output, _, _ = decoder(decoder_input, initial_state=encoder_state, sequence_length=[24])

    final_output = decoder_output.rnn_output

    # Creating model
    model = tf.keras.Model(
        inputs=[encoder_input_layers, decoder_input], outputs=[final_output])

    optimizer = tf.keras.optimizers.RMSprop(learning_rate)
    model.compile(loss="mse", optimizer=optimizer)

    return model, encoder_input_layers, encoder_state, decoder_cell, output_layer, sampler

 かなり見にくいですが、モデルは次のような構造になっています。

model (1).png
 
 次に、上記のモデルの学習済み層を利用して、予測用のモデルを作成します。Functional APIを使用して作成したモデルの保存を試みましたが、エラーの解消ができなかったため、サブクラス化しています。samplerには、Inference Samplerを使用して、予測を次の時点での入力に使用します。trainセットのtemp_mean、temp_var列を使用して、モデルの出力を元のスケールに直し、最終的なアウトプットとしましょう。後述しますが、データセットにTensorFlow Transformの前処理関数を使用すると、変換後のデータには、trainセットのtemperature列の平均・分散を表すtemp_mean・temp_var列が追加されるので、リスケーリングにはこれらを利用できます。

docker_images/trainer_image/create_model.py
# Creating model for prediction
# Functional APIではモデルの保存時にエラーが出るので、サブクラス化する
def predict_model(encoder_input_layers, encoder_state, decoder_cell, output_layer):

    # Encoder Layer Class
    class Inference_Encoder(tf.keras.layers.Layer):
        def __init__(self, encoder_input_layers, encoder_state):
            super().__init__()

            self.model = tf.keras.models.Model(inputs=[encoder_input_layers], outputs=encoder_state)

        @tf.function
        def call(self, inputs):

            return self.model(inputs)

    # Decoder Layer Class
    class Inference_Decoder(tf.keras.layers.Layer):

        def __init__(self, decoder_cell, output_layer):
            super().__init__()

            # Inference sampler
            self.sampler = tfa.seq2seq.sampler.InferenceSampler(
                sample_fn = lambda outputs: tf.reshape(outputs, (1, 1)),
                sample_shape = [1],
                sample_dtype = tf.float32,
                end_fn = lambda sample_ids : False,
            )

            self.decoder = tfa.seq2seq.basic_decoder.BasicDecoder(
                decoder_cell, self.sampler, output_layer=output_layer, maximum_iterations=24
            )

        @tf.function
        def call(self, initial_state):
            start_inputs = tf.zeros(shape=(1, 1))
            decoder_output, _, _ = self.decoder(start_inputs, initial_state=initial_state)
            final_output = decoder_output.rnn_output

            return final_output

    # Inference Model Class
    class Inference_Model(tf.keras.Model):
        def __init__(self, encoder_input_layers, encoder_state, decoder_cell, output_layer):
            super().__init__()

            self.encoder = Inference_Encoder(encoder_input_layers, encoder_state)
            self.decoder = Inference_Decoder(decoder_cell, output_layer)

        @tf.function
        def call(self, inputs):

            inputs_copy = inputs.copy()

            # inputsは、transform_fnで処理したデータで、訓練セットのtemperature列の平均・分散が含まれている
            # rescaleのために、それらの統計量を取り出しておく
            temp_mean = inputs_copy.pop('temp_mean')[0][0]
            temp_var = inputs_copy.pop('temp_var')[0][0]

            initial_state = self.encoder(inputs_copy)
            outputs = self.decoder(initial_state)

            outputs_rescaled = outputs * tf.sqrt(temp_var) + temp_mean

            return outputs_rescaled

    inference_model = Inference_Model(encoder_input_layers, encoder_state, decoder_cell, output_layer)

    return inference_model

モデルの保存

使用ファイル:
- docker_images/trainer_image/saved_model.py

 ここでは、モデルをCloud Storageへと保存します。サービング時、TensorFlow Transformの前処理関数を適用できるようにモデルにpreprocessing_layerを追加します。(参考:https://github.com/tensorflow/tfx/issues/2199)

docker_images/trainer_image/saved_model.py
import tensorflow as tf

def export_serving_model(model, tf_transform_output, out_dir):

    TRANSFORM_FEATURE_COLUMNS = [
        'Date', 'air_pressure_ashore', 'air_pressure_afloat', 'precipitation', 'temperature',
        'humidity', 'wind_direction', 'wind_velocity', 'hours_of_daylight', 'global_solar_radiation'
    ]

    SELECT_COLUMNS = [
        'day_sin', 'day_cos', 'year_sin', 'year_cos', 'air_pressure_ashore', 'air_pressure_afloat',
        'diff_air_pressure', 'precipitation', 'temperature', 'humidity', 'wind_vector_x', "wind_vector_y",
        'hours_of_daylight', 'global_solar_radiation', 'temp_mean', 'temp_var'
    ]

    # Building Model
    example = {
        x: tf.random.uniform(shape=(1, 24), name=x)
        for x in SELECT_COLUMNS
    }
    ex = model(example)

    # Transform raw features
    def get_apply_tft_layer(tf_transform_output):

        tft_layer = tf_transform_output.transform_features_layer()

        @tf.function
        def apply_tf_transform(raw_features_dict):

            unbatched_raw_features = {
                k: tf.squeeze(tf.reshape(v, (1, -1)))
                for k, v in raw_features_dict.items()
            }

            transformed_dataset = tft_layer(unbatched_raw_features)

            expanded_dims = {
                k: tf.reshape(v, (-1, 24))
                for k, v in transformed_dataset.items()
            }

            return expanded_dims

        return apply_tf_transform

    def get_serve_raw_fn(model, tf_transform_output):

        model.preprocessing_layer = get_apply_tft_layer(tf_transform_output)

        @tf.function
        def serve_raw_fn(features):

            preprocessed_features = model.preprocessing_layer(features)

            return preprocessed_features

        return serve_raw_fn

    serving_raw_entry = get_serve_raw_fn(model, tf_transform_output)   

    serving_transform_signature_tensorspecs = {
        x: tf.TensorSpec(shape=[None, 24], dtype=tf.float32, name=x)
        for x in TRANSFORM_FEATURE_COLUMNS
    }

    serving_signature_tensorspecs = {
        x: tf.TensorSpec(shape=[None, 24], dtype=tf.float32, name=x)
        for x in SELECT_COLUMNS
    }

    # Signatures
    signatures = {'serving_default': model.call.get_concrete_function(serving_signature_tensorspecs),
                  'transform': serving_raw_entry.get_concrete_function(serving_transform_signature_tensorspecs)}

    tf.keras.models.save_model(model=model, filepath=out_dir, signatures=signatures)

モデルの訓練

使用ファイル:
- docker_images/trainer_image/train.py

 ここでは、上述のcreate_dataset.pycreate_model.pysaved_model.pyを使って、seq2seqモデルの訓練と保存を行うtrain_evaluate関数を定義します。Hyperparameter Tuningを行う際には、evalセットでのmseを用いてモデルの性能を評価します。チューニング後に再訓練をしたら、学習済み層を用いて予測モデルを作成し、モデルを保存します。

docker_images/trainer_image/train.py
import tensorflow as tf
from tensorflow.keras.optimizers import RMSprop
import tensorflow_transform as tft
import fire
import hypertune

from create_dataset import load_dataset
from create_model import train_model
from create_model import predict_model
from save_model import export_serving_model

# Training and evaluating the model
def train_evaluate(job_dir, training_dataset_path, validation_dataset_path, num_epochs, num_units, learning_rate, dropout_rate, hptune, transform_artefacts_dir):

    training_dataset = load_dataset(training_dataset_path + "*", 256, "train")
    validation_dataset = load_dataset(validation_dataset_path + "*", 128, "eval")

    print('Starting training: learning_rate={}, dropout_rate={}'.format(learning_rate, dropout_rate))

    tf_transform_output = tft.TFTransformOutput(transform_artefacts_dir)

    model, encoder_input_layers, encoder_state, decoder_cell, output_layer, sampler = train_model(
        num_units=num_units, learning_rate=learning_rate, dropout_rate=dropout_rate
    )

    def update_sampling_probability(epoch, logs):
        eps = 1e-16
        proba = max(0.0, min(1.0, epoch / (num_epochs - 10 + eps)))
        sampler.sampling_probability.assign(proba)

    sampling_probability_cb = tf.keras.callbacks.LambdaCallback(on_epoch_begin=update_sampling_probability)

    history = model.fit(training_dataset,
            epochs=num_epochs,
            validation_data=validation_dataset,
            callbacks=[sampling_probability_cb]
            )

    # Hyperparameter tuning
    if hptune:
        val_loss = history.history["val_loss"]
        print("val_loss: {}".format(val_loss))

        hpt = hypertune.HyperTune()
        hpt.report_hyperparameter_tuning_metric(
            hyperparameter_metric_tag='val_loss',
            metric_value=val_loss[-1])

    # Saving the model
    if not hptune:
        inference_model_dir = '{}/predict'.format(job_dir)
        inference_model = predict_model(encoder_input_layers, encoder_state, decoder_cell, output_layer)
        export_serving_model(inference_model, tf_transform_output, inference_model_dir)

        print('Inference model saved in: {}'.format(inference_model_dir))

# Execution 
if __name__ == '__main__':    
    fire.Fire(train_evaluate)

チューニング結果の取得

使用ファイル:
- pipeline/helper_components.py

 3つ目のステップです。Hyperparameter Tuningの終了後に、最もパフォーマンスが良かったパラメーターの組み合わせを取得するretrieve_best_run_opで使用するファイルを作成します。訓練ジョブの結果には、REST APIから取得できます。

 ベストパフォーマンスのlearning_rateとdropout_rateの値を取得し、再訓練時に使用します。

pipeline_helper_components.py
from typing import NamedTuple

def retrieve_best_run(
    project_id:str, job_id:str
) -> NamedTuple('Outputs', [('metric_value', float), ('learning_rate', float), ('dropout_rate', float)]):

    from googleapiclient import discovery
    from googleapiclient import errors

    ml = discovery.build('ml', 'v1')

    job_name = 'projects/{}/jobs/{}'.format(project_id, job_id)
    request = ml.projects().jobs().get(name=job_name)

    try:
        response = request.execute()
        print(response)   
    except errors.HttpError as err:
        print(err)
    except:
        print('Unexpected error')    

    best_trial = response['trainingOutput']['trials'][0]

    print("best_trial:", best_trial)

    metric_value = best_trial['finalMetric']['objectiveValue']
    learning_rate = float(best_trial['hyperparameters']['learning_rate'])
    dropout_rate = float(best_trial['hyperparameters']['dropout_rate'])

    return (metric_value, learning_rate, dropout_rate)

モデルの評価

使用ファイル:
- pipelines/helper_components.py
- docker_images/evaluate_image/create_dataset.py

4つ目のステップです。ここでは、モデルの評価を行うevaluate_model_opで使用するファイルを作成します。 まずは評価に使用するデータセットの作成についてです。訓練時とは違いDecoderへのインプットはありません。その他は、訓練時と同様です。

docker_images/evaluate_image/create_dataset.py

import tensorflow as tf

from functools import partial


# Loading dataset
def load_test_dataset(filename, batch_size):

    CSV_COLUMNS = [
        'Date', 'air_pressure_ashore', 'air_pressure_afloat', 'precipitation', 'temperature',
        'humidity', 'wind_direction', 'wind_velocity', 'hours_of_daylight', 'global_solar_radiation',
        'weather', 'cloud cover'
    ]

    SELECT_COLUMNS = [
        'Date', 'air_pressure_ashore', 'air_pressure_afloat', 'precipitation', 'temperature',
        'humidity', 'wind_direction', 'wind_velocity', 'hours_of_daylight', 'global_solar_radiation'
    ]

    DEFAULTS = [[0.0] for i in SELECT_COLUMNS]

    # Packing features
    def pack(features):
        packed_features =  tf.stack(list(features.values()), axis=1)

        return tf.reshape(packed_features, [-1])

    @tf.function
    def marshal(x, feature_keys):
        features = {
            k: x[:, feature_keys.index(k)] for k in feature_keys
        }

        return features

    # Window processing
    def windowed_dataset(dataset, batch_size):

        marshal_fn_partial = partial(marshal, feature_keys=SELECT_COLUMNS) 

        dataset = dataset.map(pack)
        dataset = dataset.window(size=48, shift=1, drop_remainder=True)
        dataset = dataset.flat_map(lambda window: window.batch(48))

        x_test = dataset.map(lambda window: window[:24]).map(marshal_fn_partial).batch(batch_size, drop_remainder=True).repeat(1).prefetch(1)  
        y_true = dataset.map(lambda window: window[24:, 4]).batch(batch_size, drop_remainder=True).repeat(1).prefetch(1)  

        return x_test, y_true

    dataset = tf.data.experimental.make_csv_dataset(
            file_pattern=filename,
            column_names=CSV_COLUMNS,
            column_defaults=DEFAULTS,
            select_columns=SELECT_COLUMNS,
            header=False,
            batch_size=1,
            shuffle=False,
            num_epochs=1
    )

    x_test, y_true = windowed_dataset(dataset, batch_size)

    return x_test, y_true

 ロードしたデータを、モデルのpreprocessing_layerで前処理します。その後、予測値(y_pred)と実際の気温(y_true)とを用いてmseを計算します。

pipeline/helper_components.py
# モデルの予測値と、実現値を使ってモデルの評価を行う
def evaluate_model(
    dataset_path: str, model_path: str, transform_artefacts_dir: str, metric_name: str
) -> NamedTuple('Outputs', [('metric_name', str), ('metric_value', float), ('mlpipeline_metrics', 'Metrics')]):

    import json

    import tensorflow as tf
    import numpy as np

    from create_dataset import load_test_dataset


    def calculate_loss(y_pred, y_true):

        mse = tf.keras.losses.MeanSquaredError()

        return mse(y_true, y_pred).numpy().astype(np.float64)

    model_path = '{}/predict'.format(model_path)
    model = tf.keras.models.load_model(model_path)

    x_test, y_true = load_test_dataset(dataset_path + "*", 256)

    x_test_transformed = x_test.map(model.preprocessing_layer)

    prediction = []
    for item in x_test_transformed:
        prediction.append(model.predict(item))

    y_pred = np.array(prediction).reshape(-1, 24)        
    y_true = np.array(list(tf.data.Dataset.as_numpy_iterator(y_true))).reshape(-1, 24)

    if metric_name == "mse":
        metric_value = calculate_loss(y_pred, y_true)
        print("metric_value:", metric_value)

    else:
        metric_name = 'N/A'
        metric_value = 0

    metrics = {
        'metrics': [{
            'name': metric_name,
            'numberValue': metric_value
        }]
    }

    return (metric_name, metric_value, json.dumps(metrics))

モデルのデプロイ

使用ファイル:
- kfp-pipeline/custom_prediction/predictor.py
- kfp-pipeline/custom_prediction/preprpcess.py
- kfp-pipeline/custom_prediction/setup.py

 5つ目のステップです。ここでは、モデルのデプロイを行うmlengine_deploy_opで使用するファイルを作成します。今回は、カスタム予測ルーチンを作成します。

 データの抽出・前処理を行ったのと同様の変換を行います。生のデータのDate列をtimestamp型へ、wind_direction列を数値型へと変換します。さらに、モデルに保存した前処理層を適用します。変換後の特徴量をモデルへとインプットすることで、予測が返されます。

custom_prediction/predictor.py
import os
import pickle

import numpy as np
import tensorflow as tf

import preprocess


class MyPredictor(object):
    def __init__(self, model):
        self._model = model

    def predict(self, instances, **kwargs):

        preprocessed_inputs = {}

        for i in instances:
            for k, v in i.items():
                if k not in preprocessed_inputs.keys():
                    preprocessed_inputs[k] = [v]
                else:
                    preprocessed_inputs[k].append(v)

        preprocessed_inputs["Date"] = [preprocess.convert_to_timestamp(i) for i in preprocessed_inputs["Date"]]
        preprocessed_inputs["wind_direction"] = [preprocess.direction_to_degree(i) for i in preprocessed_inputs["wind_direction"]]

        preprocessed_inputs = {
            k: tf.reshape(np.array(v, dtype=np.float32), shape=(-1, 24))
            for k, v in preprocessed_inputs.items()
        }

        transformed_inputs = self._model.preprocessing_layer(preprocessed_inputs)

        outputs = self._model.predict(transformed_inputs, steps=1).reshape(-1, 24)

        return outputs.tolist()

    @classmethod
    def from_path(cls, model_dir):

        model = tf.keras.models.load_model(model_dir)    

        return cls(model)

  カスタム予測ルーチンの作成方法は以下の通りです。

このページで説明しているカスタムコードの例をパッケージ化してアップロードするには、次の操作を行います。
1. 前のセクションで説明した preprocess.py、predictor.py、setup.py ファイルをすべて同じディレクトリに作成します。シェルで、このディレクトリに移動します。
2. python setup.py sdist --formats=gztar を実行して、dist/my_custom_code-0.1.tar.gz を作成します。
3. この tarball を Cloud Storage のステージング ロケーションにアップロードします。

 ファイルを保存したディレクトリに移動し、シェルで次のコマンドを実行します。

python setup.py sdist --formats=gztar
gsutil cp dist/my_custom_code-0.1.tar.gz gs://YOUR_BUCKET/PATH_TO_STAGING_DIR/

 以上で、5つのステップ全てが完了しました。これらをパイプラインにまとめて実行しましょう。

Pipelineの定義

使用ファイル:
- pipeline/model_training_pipeline.py

 パイプライン全体はこちらのファイルからご覧ください。ここまでの記述内容と重複する部分もありますが、順を追って説明します。まずは、コンポーネントの作成です。今回は、事前定義されたGoogle Cloudコンポーネントを2つ、軽量のPythonコンポーネントを3つ作成しました。それぞれのコンポーネントで使用するDockerfileはこちらからご覧ください。

pipeline/model_training_pipeline.py
import os

import kfp
from kfp.components import func_to_container_op

from helper_components import retrieve_best_run
from helper_components import evaluate_model
from preprocess_dataflow_pipeline import run_transformation_pipeline

# Setting defaults
BASE_IMAGE = os.getenv('BASE_IMAGE') # docker_images/base_image/Dockerfile
TRANSFORM_IMAGE = os.getenv('TRANSFORM_IMAGE') # docker_images/transform_image/Dockerfile
TRAINER_IMAGE = os.getenv('TRAINER_IMAGE') # docker_images/trainer_image/Dockerfile
EVALUATE_IMAGE = os.getenv('EVALUATE_IMAGE') # docker_images/evaluate_image/Dockerfile
COMPONENT_URL_SEARCH_PREFIX = os.getenv('COMPONENT_URL_SEARCH_PREFIX')

# Create component factories        
component_store = kfp.components.ComponentStore(
    local_search_paths=None, url_search_prefixes=[COMPONENT_URL_SEARCH_PREFIX])

# Pre-build components
mlengine_train_op = component_store.load_component('ml_engine/train')
mlengine_deploy_op = component_store.load_component('ml_engine/deploy')

# Lightweight components
run_transform_pipeline_op = func_to_container_op(
    run_transformation_pipeline, base_image=TRANSFORM_IMAGE)
retrieve_best_run_op = func_to_container_op(
    retrieve_best_run, base_image=BASE_IMAGE)
evaluate_model_op = func_to_container_op(
    evaluate_model, base_image=EVALUATE_IMAGE)

 次に、パイプラインの定義です。weather_forecast_train関数内で、コンポーネントを呼び出しタスクを実行していきます。関数の引数はそれぞれコンポーネントへと渡されます。

pipeline/model_training_pipeline.py
import datetime

from kfp.gcp import use_gcp_secret

RUNTIME_VERSION = os.getenv('RUNTIME_VERSION')
PYTHON_VERSION = os.getenv('PYTHON_VERSION')

# Defining the pipeline
@kfp.dsl.pipeline(
    name='Weather-forecast Model Training',
    description='The pipeline training and deploying the Weather-forecast pipeline'
)
def weather_forecast_train(project_id,
        gcs_root,
        region,
        source_table_name,
        num_epochs_hypertune,
        num_epochs_retrain,
        num_units,
        evaluation_metric_name,
        evaluation_metric_threshold,
        model_id,
        version_id,
        replace_existing_version,
        hypertune_settings=HYPERTUNE_SETTINGS):

    # Creating datasets    
    job_name = 'preprocess-weather-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
    dataset_location = '{}/{}/{}'.format(gcs_root, 'datasets', kfp.dsl.RUN_ID_PLACEHOLDER)

    create_dataset = run_transform_pipeline_op(
        source_table_name, job_name, gcs_root, project_id, region, dataset_location)

    # Tune hyperparameters
    tune_args = [
        '--training_dataset_path', create_dataset.outputs["training_file_path"],
        '--validation_dataset_path', create_dataset.outputs["validation_file_path"],
        '--num_epochs', num_epochs_hypertune,
        '--num_units', num_units,
        '--hptune', 'True',
        '--transform_artefacts_dir', create_dataset.outputs["transform_artefacts_dir"]
    ]

    job_dir = '{}/{}/{}'.format(gcs_root, 'jobdir/hypertune', kfp.dsl.RUN_ID_PLACEHOLDER)

    hypertune = mlengine_train_op(
        project_id=project_id,
        region=region,
        master_image_uri=TRAINER_IMAGE,
        job_dir=job_dir,
        args=tune_args,
        training_input=hypertune_settings).apply(use_gcp_secret()) # Kubernetesシークレットを使用しないと、長時間の訓練が途中で停止します。

    #Retrive the best trial
    get_best_trial = retrieve_best_run_op(
        project_id, hypertune.outputs['job_id'])

    # Re-training the model
    job_dir = '{}/{}/{}'.format(gcs_root, 'jobdir', kfp.dsl.RUN_ID_PLACEHOLDER)

    train_args = [
        '--training_dataset_path', create_dataset.outputs["training_file_path"],
        '--validation_dataset_path', create_dataset.outputs["validation_file_path"],
        '--num_epochs', num_epochs_retrain,
        '--num_units', num_units,
        '--learning_rate', get_best_trial.outputs['learning_rate'],
        '--dropout_rate', get_best_trial.outputs['dropout_rate'],
        '--hptune', 'False',
        '--transform_artefacts_dir', create_dataset.outputs["transform_artefacts_dir"]
    ]

    train_model = mlengine_train_op(
        project_id=project_id,
        region=region,
        master_image_uri=TRAINER_IMAGE,
        job_dir=job_dir,
        args=train_args).apply(use_gcp_secret())

    # Evaluating the model
    eval_model = evaluate_model_op(
        dataset_path=create_dataset.outputs['testing_file_path'],
        model_path=str(train_model.outputs['job_dir']),
        transform_artefacts_dir=create_dataset.outputs['transform_artefacts_dir'],
        metric_name=evaluation_metric_name)

    # Deploying the model
    with kfp.dsl.Condition(eval_model.outputs['metric_value'] < evaluation_metric_threshold):
        model_uri = '{}/predict'.format(train_model.outputs["job_dir"])

        deploy_model = mlengine_deploy_op(
            model_uri=model_uri,
            project_id=project_id,
            model_id=model_id,
            version_id=version_id,
            model = {"regions": [region],
    #                  "onlinePredictionLogging": True, # 同名のモデルがあると、デプロイ時にエラーが出るので、コメントアウトします。
                     "onlinePredictionConsoleLogging": True},
            version = {"packageUris": ["gs://[your_bucket]/staging/dist/my_custom_code-0.1.tar.gz"], # change your code
                       "predictionClass": "predictor.MyPredictor"},
            runtime_version=RUNTIME_VERSION,
            python_version=PYTHON_VERSION,
            replace_existing_version=replace_existing_version)

 まず初めに、run_transform_pipeline_opを実行します。Dataflowジョブが実行され、Cloud Storageにデータセットが保存されます。ジョブの実行結果は次の通りです。
スクリーンショット 2021-08-04 11.55.55(2).png

 次に、mlengine_train_opでHyperparameter Tuningを実行します。Optimizerのlearning_rateとDropoutの割合(dropout_rate)をチューニングします。以下がチューニングの設定です。設定については、こちらが参考になります。前述した通り、use_gcp_secretを使用して実行しなければ、長時間の訓練ジョブが途中で停止する場合があります。

 そして、retrieve_best_run_opで最良のハイパーパラメーターを取得し、mlengine_train_opで再度トレーニングを行い、モデルを保存します。evaluate_model_opでモデルの評価指標を計算したら、それが事前設定した閾値よりも良い場合に、mlengin_deploy_opを実行します。model, versionの2つの引数の設定には、こちらを参照ください。onlinePredictionConsoleLoggingをTrueにすることで、ログエクスプローラーからログを確認できるようになります。また、version引数で、カスタム予測ルーチンの設定をしています。

Pipelineの実行

使用ファイル:
- kfp-pipeline.ipynb
- cloudbuild.yaml

 それでは、上記のパイプラインをAI Platformのノートブックから手動で実行をします。ノートブックはこちらをご覧ください。ENDPOINTは画像のようにして確認できます。ARTIFACT_STORE_URIは、クラスターの作成時に自動で作成されるバケットを設定します。

kfp-pipeline.ipynb
REGION = 'XXXXXXXXXXX' #change your code
ENDPOINT = 'XXXXXXXXXX.pipelines.googleusercontent.com' #change your code
ARTIFACT_STORE_URI = 'gs://XXXXXX-kubeflowpipelines-default' #change your code

PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]

スクリーンショット 2021-06-24 15.30.20.png
スクリーンショット 2021-06-24 15.31.30.png

上述のノートブックのセルを順に実行することで、パイプラインが実行されます。学習が終了してモデルが保存されたら、saved_model_cliコマンドを使って、モデルの入出力の形状などを確認できます。saved_modelの扱いに関してはこちら

%%bash
saved_model_cli show --dir gs://model_dir --all

 また、オンライン予測ジョブを送信して、デプロイしたモデルが求める予測が返すかを確かめます。

kfp-pipeline.ipynb
from oauth2client.client import GoogleCredentials
from googleapiclient import discovery
from googleapiclient import errors

service = discovery.build('ml', 'v1')

def predict_json(project, model, instances, version=None):

    name = 'projects/{}/models/{}'.format(project, model)

    if version is not None:
        name += '/versions/{}'.format(version)

    response = service.projects().predict(
        name=name,
        body={'instances': instances}
    ).execute()

    if 'error' in response:
        raise RuntimeError(response['error'])

    return response['predictions']

 インプットには、こちらのサンプルデータを使います。24個の数字のリストが返されます。

kfp-pipeline.ipynb
import json

VERSION_ID = "v01"

instances = {}
with open("daily_data.json", mode="r") as f:
    instances = json.load(f)

predict_json(PROJECT_ID, MODEL_ID, instances, VERSION_ID)

CI/CDの設定

使用ファイル:
- kfp-pipeline.ipynb
- cloudbuild.yaml

 ここでは、リモートレポジトリへの新しいタグのpushをトリガーとして、パイプラインを自動実行するための設定を行います。前項において手動で実行していたDockerfileのビルド等の作業をyamlファイルに記述します。build構成ファイルの記述はこちらが参考になります。 "_"から始まる変数は、ユーザー定義の変数で、後から置換するものを表します。$PROJECT_ID$TAG_NAMEといった変数は自動で置換されます。

 以下の通り、5つのDockerfileのビルドに続いて、パイプラインのコンパイル・アップロード・ランが行われます。

cloudbuild.yaml
steps:
# Build the trainer image
- name: 'gcr.io/cloud-builders/docker'
  args: ['build', '-t', 'gcr.io/$PROJECT_ID/$_TRAINER_IMAGE_NAME:$TAG_NAME', '.']
  dir: $_PIPELINE_FOLDER/docker_images/trainer_image
  id: 'Build the trainer image'

# Build the transform image
- name: 'gcr.io/cloud-builders/docker'
  args: ['build', '-t', 'gcr.io/$PROJECT_ID/$_TRANSFORM_IMAGE_NAME:$TAG_NAME', '.']
  dir: $_PIPELINE_FOLDER/docker_images/transform_image
  id: 'Build the transform image'

# Build the evaluate image
- name: 'gcr.io/cloud-builders/docker'
  args: ['build', '-t', 'gcr.io/$PROJECT_ID/$_EVALUATE_IMAGE_NAME:$TAG_NAME', '.']
  dir: $_PIPELINE_FOLDER/docker_images/evaluate_image
  id: 'Build the evaluate image'

# Build the base image for lightweight components
- name: 'gcr.io/cloud-builders/docker'
  args: ['build', '-t', 'gcr.io/$PROJECT_ID/$_BASE_IMAGE_NAME:$TAG_NAME', '.']
  dir: $_PIPELINE_FOLDER/docker_images/base_image
  id: 'Build the base image'

# Build the base image for lightweight components
- name: 'gcr.io/cloud-builders/docker'
  args: ['build', '-t', 'gcr.io/$PROJECT_ID/kfp-cli:latest', '.']
  dir: $_PIPELINE_FOLDER/docker_images/kfp-cli
  id: 'Build the kfp-cli'

# Compile the pipeline
- name: 'gcr.io/$PROJECT_ID/kfp-cli'
  args:
  - '-c'
  - |
    dsl-compile --py $_PIPELINE_DSL --output $_PIPELINE_PACKAGE
  env:
  - 'BASE_IMAGE=gcr.io/$PROJECT_ID/$_BASE_IMAGE_NAME:$TAG_NAME'
  - 'TRAINER_IMAGE=gcr.io/$PROJECT_ID/$_TRAINER_IMAGE_NAME:$TAG_NAME'
  - 'TRANSFORM_IMAGE=gcr.io/$PROJECT_ID/$_TRANSFORM_IMAGE_NAME:$TAG_NAME'
  - 'EVALUATE_IMAGE=gcr.io/$PROJECT_ID/$_EVALUATE_IMAGE_NAME:$TAG_NAME'
  - 'RUNTIME_VERSION=$_RUNTIME_VERSION'
  - 'PYTHON_VERSION=$_PYTHON_VERSION'
  - 'COMPONENT_URL_SEARCH_PREFIX=$_COMPONENT_URL_SEARCH_PREFIX'
  dir: $_PIPELINE_FOLDER/pipeline
  id: 'Compile the Pipeline'

 # Upload the pipeline
- name: 'gcr.io/$PROJECT_ID/kfp-cli'
  args:
  - '-c'
  - |
    kfp --endpoint $_ENDPOINT pipeline upload -p ${_PIPELINE_NAME}_$TAG_NAME $_PIPELINE_PACKAGE
  dir: $_PIPELINE_FOLDER/pipeline
  id: 'Upload the Pipeline'

 # Deploy the pipeline in KFP
- name: 'gcr.io/$PROJECT_ID/kfp-cli'
  args:
  - '-c'
  - |
    kfp --endpoint $_ENDPOINT run submit \
        -e $_EXPERIMENT_NAME \
        -r $_RUN_ID \
        -p `kfp --endpoint $_ENDPOINT pipeline list | grep -w ${_PIPELINE_NAME}_$TAG_NAME | grep -E -o -e "([a-z0-9]){8}-([a-z0-9]){4}-([a-z0-9]){4}-([a-z0-9]){4}-([a-z0-9]){12}"` \
        project_id=$PROJECT_ID \
        gcs_root=$_GCS_STAGING_PATH \
        region=$_REGION \
        source_table_name=$_SOURCE_TABLE \
        num_epochs_hypertune=$_NUM_EPOCHS_HYPERTUNE \
        num_epochs_retrain=$_NUM_EPOCHS_RETRAIN \
        num_units=$_NUM_UNITS \
        evaluation_metric_name=$_EVALUATION_METRIC \
        evaluation_metric_threshold=$_EVALUATION_METRIC_THRESHOLD \
        model_id=$_MODEL_ID \
        version_id=$TAG_NAME \
        replace_existing_version=$_REPLACE_EXISTING_VERSION
  dir: $_PIPELINE_FOLDER/pipeline
  id: 'Deploy and Run the Pipeline'
  waitFor: ['Upload the Pipeline']

# Push the images to Container Registry 
images:
- gcr.io/$PROJECT_ID/$_TRAINER_IMAGE_NAME:$TAG_NAME
- gcr.io/$PROJECT_ID/$_TRANSFORM_IMAGE_NAME:$TAG_NAME
- gcr.io/$PROJECT_ID/$_EVALUATE_IMAGE_NAME:$TAG_NAME
- gcr.io/$PROJECT_ID/$_BASE_IMAGE_NAME:$TAG_NAME

# Changing the timeout threshold
timeout: 3600s

それでは、手動でのビルドを実行してみましょう。yamlファイル内の変数値の置換は、subsutitutionsで指定します。

kfp-pipeline.ipynb
SUBSTITUTIONS="""
_ENDPOINT={},\
_TRAINER_IMAGE_NAME=trainer_image,\
_TRANSFORM_IMAGE_NAME=transform_image,\
_EVALUATE_IMAGE_NAME=evaluate_image,\
_BASE_IMAGE_NAME=base_image,\
TAG_NAME=v01,\
_PIPELINE_FOLDER=.,\
_PIPELINE_DSL=model_training_pipeline.py,\
_PIPELINE_PACKAGE=model_training_pipeline.yaml,\
_PIPELINE_NAME=weather_forecast_continuous_training,\
_RUNTIME_VERSION=2.5,\
_PYTHON_VERSION=3.7,\
_COMPONENT_URL_SEARCH_PREFIX=https://raw.githubusercontent.com/kubeflow/pipelines/1.6.0/components/gcp/,\
\
_EXPERIMENT_NAME=Weather_Forecast_Training,\
_RUN_ID=Run_001,\
_GCS_STAGING_PATH=gs://##########-kubeflowpipelines-default/staging,\
_REGION=asia-northeast1,\
_SOURCE_TABLE=weather_data.tokyo,\
_NUM_EPOCHS_HYPERTUNE=1,\
_NUM_EPOCHS_RETRAIN=1,\
_NUM_UNITS=128,\
_EVALUATION_METRIC=mse,\
_EVALUATION_METRIC_THRESHOLD=10.0,\
_MODEL_ID=weather_forecast,\
_REPLACE_EXISTING_VERSION=True
""".format(ENDPOINT).strip()

# 手動ビルド
!gcloud builds submit . --config cloudbuild.yaml --substitutions {SUBSTITUTIONS}

 それでは、Cloud Buildの設定を行い、パイプラインの実行の自動化を実現します。こちらのドキュメントを参考に進めます。今回は、リポジトリへの新しいタグのpushをトリガーとして、cloudbuild.yamlファイルが読み込まれ、タスクが実行されます。

スクリーンショット 2021-08-01 22.39.25(2).png

スクリーンショット 2021-08-01 22.41.53(2).png

 最後に、cloudbuild.yamlへの代入変数をリストアップします。

スクリーンショット 2021-08-01 22.41.45(2).png
スクリーンショット 2021-08-01 22.42.05(2).png

 シェルで以下のコマンドを実施すると、パイプラインの実行が始まります。 Cloud Buildの履歴から実行のログが確認できます。

git tag (tag name)
git push origin main --tag

スクリーンショット 2021-08-01 23.04.12.png

自動スクレイピング

使用ファイル:
- scheduled_scrayper/main.py
- scheduled_scrayper/requirements.txt

こちらのプログラムによって、BigQueryのテーブルに1日1回のレコードの追加が行われます。また同時に、翌日の気温の予測を行うためのオンライン予測ジョブをが実行され、予測値はCloud Storageへと保存されます。※カスタム予測ルーチンを含むモデルでは、バッチ予測ができません。
 ここでは、このプログラムが定期実行されるようにCloud FunctionsCloud Schedulerの設定を行います。こちらのドキュメントに倣い設定します。Cloud Schedulerの設定で頻度を次のように指定します。これは、実行を1日1回午前2時にスケジュールすることを意味します。

スクリーンショット 2021-08-01 15.20.54(2).png

 BigQueryのテーブルへの書き込みやCloud Storageへのアップロードについては、以下が参考になります。

終わりに

 今回は、Kubeflow Pipelinesを用いた機械学習パイプラインの自動化を実践しました。完全なパイプラインには程遠い部分もありますが、新しい要素の追加も比較的容易に行えるようになっていると思います。 今後は、TensorFlow Data Validationなど新たなコンポーネントの導入をしたいと考えています。

 実務未経験者の記事なので間違いなどがございましたら、ご指摘いただけると幸いです。ご覧いただきありがとうございました。

【追記:2021/08/14】
モデルのデプロイ時に、訓練したモデルとデフォルトのモデルとを同じテストセットで評価してから、訓練指標の良い方をデプロイする様に変更しました。
また、新しいモデルがデプロイされると自動的にデフォルトに設定されます。
GitHubレポジトリは更新済みです。当記事は、後ほど変更します。

16
15
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
15