こんにちは,NTTドコモ入社4年目の石井です.
業務では機械学習を用いたレコメンデーション機能の開発に取り組んでいます.
4日の記事では本業務とは直接的には関係ないですが,最近ではDevOpsと並んで盛り上がりを見せているMLOpsの領域から実行サイクルを円滑化してくれるワークフローエンジンである Airflow を用いたパイプライン実行について紹介します.
はじめに
最近ではCI/CDと行った技術はソフトウェア開発の現場では当たり前のように活用されていると思いますが,機械学習における継続的デプロイは明確なベストプラクティスがまだ定義されておらず,各々の置かれている状況や環境に応じて様々な形をとっていると思われます.特に研究開発の部署では議論の中心はモデリングになってくるため学習モデルの管理や継続的デプロイについては優先度が下がってしまうのが現状です.
一方でGoogle社が2015年に投稿した論文1内に記載されている以下の図のように基本的に機械学習システムにおけるモデリングの領域(機械学習)は非常に小さいことが分かると思います.最近では社内でも機械学習を活用したシステムを運用する機会が増えてきており,今後は研究開発によるモデリングからエンドユーザが利用するServingまでの全体最適をどうするかの議論が活発化すると考えられます.
出典: Hidden Technical Debt in Machine Learning Systemsより
そこで,今回はデータサイエンティストとしてモデリング外の領域を広げていくことを目指して個人的に興味を持っていたAirflowを用いたパイプラインを実践していこうと思います.
ゴール
今回のゴールですがAirflowを用いたパイプラインを作成して学習モデルの生成からモデルのオンライン推論までを実践していこうと思います.このゴール設定にした理由はモデルの学習からオンライン推論を実施する部分までを記載している記事を単純に私が見つけられなかったからです...
さて,具体的には以下のシナリオを実施して行こうと思います.
- Airflowを用いたDAG記述によるフローを定義とAirflowの実行
- APIをトリガーとして作成したタスク実行
- Titanicデータを使ったモデル学習タスクによる学習モデル生成
- 学習モデルのTensorflowServing環境へのデプロイ
- タスク完了ステータスのSlackチャネル通知
- 推論結果をAPIリクエストにて確認
Airflowとは
Airflow2はApacheのトップレベルプロジェクトとして開発されているOSSになります.その役割はワークフローの記述やスケジューリング,モニタリングといった機能を提供するプラットフォームとなっています.つまり,これまでパイプラインの作成に際して手動でCronバッチを作成したり,ログ管理を行ったり,データを取得するために個別でスクリプトを用意していたところをAirflowによって一元的に管理しながら継続実行していくことを可能にします.
そして,Airflowの仕組みについてですがAirflowでは大きく3つのコンポーネントからから構成されています.
-
WebServer
WebServerはDAGの設定内容や進行状況の確認等のモニタリング機能を提供します.DAGなどの設定情報や実行履歴などは接続しているDBに保存しており,それらの情報を読み取ってWeb画面上に表示します. -
Scheduler
SchedulerはDAGの一覧を監視して実行可否を判断し,DAGを実行する機能を提供します.また,Brokerと呼ばれるタスクキューにジョブを投入する役割も担います. -
Worker
Brokerに積まれているジョブをdequeueしてタスクを実行するインスタンスとしての役割を担い,並列分散処理を実現する機能を提供します.
これらの3つのコンポーネントによって構成されているAirflowですが,Airflowを簡単に試したいという場合には単一処理としてシーケンシャルにタスクを実行することも可能です.その場合はBrokerを用いる必要はありませんが,基本的には並列分散で実行したいというシーンの方が多いと思いますのでBrokerはほぼ必須と考えて良いと考えられます.
また,AirflowではWebServerを介してREST APIが提供されているため,/api/experimental/
のエンドポイントを利用して外部からAirflowを操作することもできます.この点は外部のシステムと連携しやすさを考えると非常に便利です.
TensorflowServingとは
TensorflowServing3は学習済みモデルをサービスとしてデプロイし、プロダクション環境で柔軟でパイパフォーマンスなオンライン推論APIを実現するソフトウェアです。Google社がApache2.0ライセンスとして公開しているOSSでtensorflow extended内のコンポーネントとしても利用されています.
ここではTensorflowServingの詳細は本題から外れてしまうため割愛しますが,ポイントを簡単にお伝えするとtensorflow servingはC++で記述されていることから高速で動作し,さらにはgRPCという通信プロトコルによるリクエストを受け付けるインターフェースを提供してくれるためモデルをプロダクション環境で運用する際には非常に有効なツールとなっています.また,学習モデルのバージョン管理や並列運用も可能となっており,至れり尽くせりの機能を提供しています.興味がある方は是非一度触って見てください.
ただ,個人的に唯一改善してほしいと感じたのは対応しているモデル形式です.Tensorflowで利用されているSavedModel形式のみに対応しているため,他のアルゴリズムやフレームワークで作成したモデルを簡単にデプロイすることができません.この点は今後拡張して行ってもらいると非常に嬉しいですね.
システムアーキテクチャ
最終的に構成するシステムアーキテクチャの全体像をまとめておきます.
今回は1つのサーバーノードの中にシステムを構築していき,以下のような構成を再現していこうと思います.
Airflowの情報を管理するデータベースにはPostgresを利用します.そして,SchedulerからWorkerへジョブを受け渡すBrokerにはRedisを用いました.
データベースやBrokerの選定には特に理由はありませんが,Airflowの公式ドキュメントで推奨しているデータベースということでPostgresを,BrokerにはRedisを利用しました.バックエンドで利用できるデータベースに大きな制約はありませんが,AirflowがSqlAlchemyを利用してデータベースの操作を行う関係で,SqlAlchemyがサポートしているデータベースを採用すべきと公式ドキュメントに注意書きがあったのでやはり推奨のものを利用するのがベターのようです.
検証実行環境
- Amazon Linux 2 (t2.large)
- Python 3.7.4
- Airflow 1.10.6
- tensorflow 2.0
- tensorflow serving 1.10.0
モデル作成
モデルはKaggleでお馴染みのTitaicのデータセットを利用して生存者を分類する学習モデルを生成します.データ取得はオンラインストレージ上に格納されている場所から取得するため,学習タスクの中でデータ取得の処理も併せて実行していきます.Titanicのデータがどのようなデータが見たことのない人のためにサンプルで以下にデータの一部を記載しておきます.
フレームワークはTensorflow Servingで扱うSavedModel形式のモデルを生成したいためTensorflowを利用し,モデルのアルゴリズムはGradientBoostingTreeを用いてモデルを学習します.以下に今回利用するモデルを生成するコードを記載します.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import numpy as np
import pandas as pd
import tensorflow as tf
CATEGORICAL_COLUMNS = ['sex', 'n_siblings_spouses', 'parch',
'class', 'deck', 'embark_town', 'alone']
NUMERIC_COLUMNS = ['age', 'fare']
class Train:
def __init__(self):
self.model = None
self.X_train = None
self.X_eval = None
self.y_train = None
self.y_eval = None
self.feature_columns = []
def load(self):
self.X_train = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/train.csv')
self.X_eval = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/eval.csv')
self.y_train = self.X_train.pop('survived')
self.y_eval = self.X_eval.pop('survived')
def train(self):
tf.random.set_seed(123)
fc = tf.feature_column
num_examples = len(self.y_train)
def one_hot_cat_column(feature_name, vocab):
return tf.feature_column.indicator_column(
tf.feature_column.categorical_column_with_vocabulary_list(feature_name, vocab))
def make_input_fn(X, y, num_example, n_epochs=None, shuffle=True):
def input_fn():
dataset = tf.data.Dataset.from_tensor_slices((X.to_dict(orient='list'), y))
if shuffle:
dataset = dataset.shuffle(num_example)
dataset = (dataset
.repeat(n_epochs)
.batch(num_example))
return dataset
return input_fn
for feature_name in CATEGORICAL_COLUMNS:
vocabulary = self.X_train[feature_name].unique()
self.feature_columns.append(
one_hot_cat_column(
feature_name,
vocabulary
)
)
for feature_name in NUMERIC_COLUMNS:
self.feature_columns.append(
tf.feature_column.numeric_column(
feature_name,
dtype=tf.float32
)
)
train_input_fn = make_input_fn(self.X_train, self.y_train, num_examples)
eval_input_fn = make_input_fn(self.X_eval, self.y_eval, num_examples, shuffle=False, n_epochs=1)
params = {
'n_trees': 50,
'max_depth': 3,
'n_batches_per_layer': 1,
'center_bias': True
}
self.model = tf.estimator.BoostedTreesClassifier(self.feature_columns, **params)
self.model.train(train_input_fn, max_steps=5)
results = self.model.evaluate(eval_input_fn)
return results
def export(self, path):
feature_spec = tf.feature_column.make_parse_example_spec(self.feature_columns)
serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
self.model.export_saved_model(
path,
serving_input_receiver_fn
)
def execute():
output = "/".join([os.environ["AIRFLOW_HOME"],"models", "gbdt"] )
tr = Train()
tr.load()
tr.train()
tr.export(os.getenv("MODELPATH", output)
Dag記述
次にAirflow側のワークフローを記述していきます.
AirflowではDAG(Directed Acyclic Graph)と呼ばれる有効非巡回グラフを用いてタスクの記述や条件分岐,並列処理などをまとめていきます.
DAGの記載方法はインスタンス化したOperatorを用いてタスクを登録し,タスク間の依存関係を定義します.そして,Operatorにはいくつか種類がありメジャーなところだとBashシェルを実行するBashOperatorやPythonのメソッドを実行できるPythonOperator,GCP上のBigQueryにクエリを発行するBigQueryOperatorなどがあるので実現したいタスクに応じて様々なOperatorを使い分けることになります.もっと他のOperatorについて知りたいという方はAirflowの公式ドキュメント4をご覧ください.
それでは早速DAGを定義してきましょう.今回は以下の表のようにタスクをOperatorを用いて記述してDAGを定義しました.
タスク名 | Operator | 説明 |
---|---|---|
start, end | BashOperator | Bashシェルコマンドを実行して標準出力を行います.実行開始や終了の出力をします. |
train | PythonOperator | データの読み込みからTensorflowにおける学習モデル生成までの処理を実施します. |
branch | BranchPythonOperator | 既存でTesorflow Servingが動作しているかを確認して次の処理を選択します. |
deploy | BashOperator | TensorflowServingのDockerImageを用いて学習モデルをデプロイします. |
update | BashOperator | Tensorflow Serving上のモデルを更新します. |
join | DummyOperator | 条件分岐したタスクを結合するダミータスクを実施します. |
slack | SlackWebhookOperator | slackのWebhhookを利用してDAGの処理が完了したことを通知します. |
上記のタスクを定義したサンプルコードは以下になります.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import pytz
import docker
import datetime
from datetime import datetime, timedelta
from src.train import execute
import airflow
from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
START_TIME = datetime.now(pytz.timezone('Asia/Tokyo')).strftime('%Y-%m-%d %H:%M:%S')
CLI = docker.DockerClient(base_url=os.getenv('DOCKER_URL', "unix://var/run/docker.sock"))
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.today(),
'email': None,
'email_on_failure': False,
'email_on_retry': False,
}
dag = DAG(
'ml_pipeline',
description='DAG for machine learning pipeline that performs from model learning to deployment',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
def branch_func(**kwargs):
try:
CLI.containers.get("serving")
return "update"
except:
return "deploy"
slack_message = " \
【Slack ML Notification】\n\
*ステータス:* Success\n\
*タスクID:* ml_pipeline\n\
*実行開始時間:* " + START_TIME
start_command = """
echo "=================="
echo " Airflow Task Run "
echo "=================="
"""
end_command = """
echo "====================="
echo " Airflow Task Finish "
echo "====================="
"""
deploy_command = """
docker run -d \
-p 8500:8500 \
-p 8501:8501 \
--name "serving" \
-e MODEL_NAME={{params.model}} \
-v "$AIRFLOW_HOME/models:/models" \
tensorflow/serving
"""
update_command = """
docker cp $AIRFLOW_HOME/models serving:/models
docker restart serving
"""
start = BashOperator(
task_id='start',
bash_command=start_command,
dag=dag
)
end = BashOperator(
task_id='end',
bash_command=end_command,
dag=dag
)
train = PythonOperator(
task_id='train',
python_callable=execute,
dag=dag
)
branch = BranchPythonOperator(
task_id='branch',
python_callable=branch_func,
dag=dag
)
deploy = BashOperator(
task_id='deploy',
bash_command=deploy_command,
params={"model": "gbdt"},
dag=dag,
)
update = BashOperator(
task_id='update',
bash_command=update_command,
dag=dag
)
join = DummyOperator(
task_id='join',
trigger_rule='none_failed',
dag=dag
)
slack = SlackWebhookOperator(
task_id='slack',
http_conn_id='slack_webhook',
webhook_token=BaseHook.get_connection('slack_webhook').password,
message=slack_message,
channel='#learning-notice',
username='Airflow',
dag=dag
)
start >> train >> branching
branch >> deploy >> join
branch >> update >> join
join >> slack >> end
これで今回利用するファイルの説明は終了です.
学習によって出力したモデルを本来はバージョン管理したいところですが今回は簡略化のため,とりあえず最新のモデルを継続してデプロイするだけにしました.
次からはAirflowの環境構築を行って実行していきましょう.
環境構築
システムアーキテクチャの項目で記載した構成に従ってAirflowと各種ソフトウェア環境を構築していきます.
まず初めに周辺環境のセットアップから初めてAirflowの設定していこうと思います.
周辺環境のセットアップ
PostgresとRedisはDockerコンテナを用いて構築します.
以下のコマンドを実行するだけで簡単にセットアップできます.
$ docker run -d -p 6379:6379 redis:latest redis-server
$ docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=airflow -e POSTGRES_DB=airflow -e POSTGRES_USER=airflow postgres:latest
データベースのデータを永続化したい場合はローカルのボリュームをマウントしてコンテナを立ち上げます.
また,データベースの認証情報はairflow
に統一していますがセキュリティ上よろしくないのでご自身の環境で適宜修正してください.
サーバ環境変数の設定
Airflowの設定ファイルやDAGファイルの参照先は実行環境内で何も指定しない場合は$HOME/airflow
に設定され,当該ディレクトリ内にある設定ファイルやDAGの情報を参照します.そのため,利用したいDAGが$HOME/airflow
以外にある場合はAirflowのルートパスを変更する必要があります.その場合は環境変数のAIRFLOW_HOME
でルートパスを指定します.
以下内容を.bash_profile
や.bashrc
ファイルに追記して環境変数を読み込んでください.
export AIRFLOW_HOME=<プロジェクトのパス>
$ source ~/.bashrc
Airflowのインストールと初期化
Airflowのインストールはpipコマンドで簡単にインストールできます.
$ pip install apache-airflow apache-airflow[postgres] apache-airflow[celery]
インストールが完了したらAirflowで利用するデータベースを初期化します.
このコマンドを実行するとAIRFLOW_HOME配下にairflow.cfgとairflow.dbというファイルが生成されます.
airflow.cfgはairflowの設定ファイルでairflow.dbはSQLiteにて作成されたデータベースを保存するファイルになります.デフォルトで利用されるデータベースはSQLiteであるため,以下のコマンドを実行したタイミングで自動的にairflow.dbが作成されますが今回はpostgresを利用するためこちらのファイルは利用しません.
$ airflow initdb
airflow.cfgファイルの修正
Airflowの設定ファイルは先ほど説明したようにairflow.cfgとなります.
設定ファイルでは接続するデータベースやBroker,ロギングの内容などを設定します.
今回は分散環境でタスクを実行するためExecutorにCeleryExecutorを指定する必要があります.airflow.cfgの初期設定ではSequntialExecutorになっているため以下の通りに修正していきます.また,併せて接続するデータベースの情報もDockerコンテナで作成したデータベース情報に合わせて変更します.
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = CeleryExecutor
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = postgresql://airflow:airflow@localhost:5432/airflow
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
broker_url = redis://localhost:6379/0
# The Celery result_backend. When a job finishes, it needs to update the
# metadata of the job. Therefore it will post a message on a message bus,
# or insert it into a database (depending of the backend)
# This status is used by the scheduler to update the state of the task
# The use of a database is highly recommended
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+postgresql://airflow:airflow@localhost:5432/airflow
Airflow内の環境変数設定
SlackOperatorでSlakに通知する際に利用する認証情報をAirflowのConnectionsに設定します.
Dagのスクリプト内に認証情報を直接記載しても実行できますが,WebUIにアクセスする人が誰でもSlackの認証情報を閲覧できてしまうためセキュリティの観点でよろしくありません.そのためConnectionsに認証情報を設定するようにします.Slackの認証情報はSlackのWebhook設定ページにアクセスして,新しくIncoming WebHookを作成することで発行できます.発行した後はWebHookURLの項目にあるURLをコピーしてください.
次にAirflowで利用するデータベースにSlackの認証情報を設定します.airflowのCLIを利用して以下のコマンドで取得したSlackの認証情報を反映させます.
$ airflow connections --add \
--conn_id=slack_webhook \
--conn_type=http \
--conn_host="https://hooks.slack.com/services" \
--conn_extra='{"webhook_token": "/TXXXXXXXXXXXX/BYYYYYYYYYYYYY/ZZZZZZZZZZZZZZZZZ"}'
上記のパラメータの説明ですが,conn_id はSlackOperator内で呼び出すキーとなりますのでご自身の環境で自由に書き換えてください.変更した場合はDAG内のSlackOperatorでhttp_conn_idに指定する環境変数のキーも設定したものに書き換える必要がありますのでご注意ください.その他のパラメータは固定でconn_typeは通信プロトコルになるためhttpを指定,conn_hostはSlackのサーバホスト名になるのでWebhookURLのserviceまでの部分,最後のconn_extraではWebhookURLのservice以降のトークンを指定します.
DAGファイルの配置
前述で指定したAIRFLOW_HOMEで指定したルートパスの配下にdagsフォルダを作成して,dags配下に作成したDAGのスクリプトを配置します.
dagsフォルダ配下に配置したファイルは自動的にairflow側で読み込まれるためワークフローを定義したDAGファイルを配置するだけで簡単にパイプラインを追加することができます.何も設定しなくても勝手に読み込んでくれるので非常に使い勝手が楽です.
また,PythonOperatorで利用する学習を実行するスクリプトファイルのtrain.py
はsrcフォルダ配下に配置してpipeline.py
から呼び出すようにしました.PYTHONPATHにはAIRFLOW_HOMEまでのパスを追加して実行します.
root/
├ README.md
├ requirements.txt
├ airflow.cfg
├ dags/
│ └ pipeline.py
├ src/
│ └ train.py
Airflowのタスク実行
ここまででAirflowを実行するための周辺環境整備はできたのでここからは実行していきます.
Airflowの起動
Airflowを実行するためには3つのコンポーネントを動作させる必要があるため,以下の3つのコマンドを実行してプロセスを立ち上げます.
$ airflow webserver & # webserverの起動
$ airflow scheduler & # schedulerの起動
$ airflow worker & # workerの起動
Airflowによる管理画面
Airflowが正常に動作していることをブラウザで確認してみましょう.
ブラウザを起動してAirflowが動作しているサーバのホスト名にアクセスしてみます.正常に動作している場合はAirflowで実行できるDAG一覧画面が表示されます.
DAG一覧の中から自身で作成したDAGを選択して先ほどのGraphViewタブを選択するとDAG記述で記載した内容がフローとして表示されるので定義した通りのワークフローになっているかを確認できます.今回は学習を実施してモデルを生成し,TensorflowServingでデプロイが完了したらSlackに通知するということで想定通りのフローが表示されていました.
APIトリガーによるタスク実行
作成したDAGを実行していきます.
トリガーとしてはいくつか種類がありますが,AirflowのREST APIにリクエストを投げて実行します.DAGを実行するAPIのエンドポイントはデフォルトでは/api/experimental/<自身のdag名>/dag_runs
になりますので,実行したいDAGの名前を指定してエンドポイント対してPOSTリクエストを投げます.
$ curl -X POST -d "{}" http://<Host>:8080/api/experimental/dags/ml_pipeline/dag_runs
上記コマンドの<Host>
にはAirflowが動作しているご自身のホスト名で置き換えて実行してください.リクエストが正常に行われるとAirflow側でタスクのインスタンスが生成されてタスクをキューに蓄積されていくのを確認できます.
処理中画面
実行後はただ処理が完了するまで待つだけです.
Slack通知
Slackタスクが完了したら実行した事前に登録したSlackのチャネルに通知してくれます.個人的にはモデルを学習させて逐一学習が終わったかを確認することが非常に煩わしいと感じているためコミュニケーションツールと簡単に連携できて学習が終了したタイミングで知らせてくれるのは非常にありがたいと感じました.
推論実施
最後に学習モデルがデプロイされた推論サーバに対してAPIリクエストを投げて推論結果を確認してみましょう.
$ curl -X POST -d \
"{ \
\"examples\": [{ \
\"sex\": [\"male\"], \
\"age\": [22.0], \
\"n_siblings_spouses\": [1], \
\"parch\": [0], \
\"fare\": [7.2500], \
\"class\": [\"Third\"], \
\"deck\": [\"unknown\"], \
\"embark_town\": [\"Southampton\"], \
\"alone\": [\"y\"] \
}] \
}" \
http://<Host>:8501/v1/models/gbdt:classify
無事にモデルがデプロイされて推論結果を取得することができました.
結果は "0" ラベルということで正しく推論できていそうですね.
{
"results": [[["0", 0.65634954], ["1", 0.343650401]]]
}
まとめ
最後まで読んで頂きありがとうございました.皆様いかがでしたでしょうか.
今回はAirflowを用いたワークフローを用いて,モデルの学習からデプロイまでを実施してみました.
実際にAirflowに触ってみるとPythonスクリプトでワークフローを簡単に記述することができるお手軽さや外部システムと連携する際のハードルの低さといった多くのメリットを感じました.また,TensorflowServingによるオンライン推論まで自動化できると作成した学習モデルの民主化等にも貢献してくるのではないかと思います.
一方で課題だと感じる部分もありました.Airflow上で複数のタスクが同時に運用されている環境ではリソース管理は当然課題に上がってくると思いますし,分散処理によるマルチクラスタにおいてログをどうやって集約するのかといった問題などもありそうです.
今回の取り組みの中でも実際にAirflowをEC2上で動作させていた時にインスタンスタイプを少なめに見積もってしまった結果,リソース不足が発生してプロセスが落ちるといったことを経験しました.やはりリソース監視とスケーリングによる対策は必至でありそうな感覚を覚えました.
とは言えAirflowは有効なツールであることは間違いないので今後の動向は継続してウォッチしていこうと思います.まだまだ手探りの状態で扱いきれていない機能も多々あるので今度はBigQueryあたりと接続したワークフローとかにチャレンジしたいと思います.