GCP上ではETL用途のパイプラインで利用されるケースをよく見かけるCloud Dataflowですが、前処理だけでなく機械学習も含めたパイプライン実行にも使えないかと試してみたので備忘録として。
やりたいこと
学習の試行錯誤を大量にする際に手元で小規模に動かしていた学習・予測のスクリプトを手軽に分散環境で大量実行できるようにしたい。手元で動かしていた学習・予測用のスクリプトを毎回リモートでも動くように修正しマシンを立ててバラまくのも面倒だったので。
あと属性生成などの前処理から学習・評価までを一筆書きに動かせる状態にしておきたい。前処理や学習部分のコードが分断されていると注意深くコードや中間データのバージョンを管理していないと予測モデルを再現できなくなってしまうし、パイプラインとして実装しておけばシステムに組み込む際にも楽ができそうなため。
インストール
手元でパイプライン実行したりクラウドにjobをsubmitする作業用マシンで以下をインストールします。
(クラウドで動かす場合はPythonは2系しか現時点では対応していないので注意)
- Apache Beamのインストール
git clone https://github.com/apache/beam.git
cd beam/sdks/python/
python setup.py sdist
cd dist/
pip install apache-beam-sdk-*.tar.gz
- Cloud Dataflowのインストール
pip install --upgrade google-cloud-dataflow
自分の環境ではそれぞれ0.6.0と0.5.5がインストールされました。
あとは手元の環境で動かすのに必要なscikit-learnやpandasなどのライブラリをインストールしておきます。
scikit-learnの学習・予測パイプラインを動かす
ここではあらかじめDataflowの実行環境にインストールされているpandasやscikit-learnを使って以下のような想定の学習・予測のパイプラインを考えてみます。
- 小売りチェーンの店舗ごとに毎日の売り上げを予測する
- 店舗単位ごとに予測モデルを作成する
- 過去2年分のデータを学習し予測モデルを作り、それを使い直後の1年を予測する
- 上記学習・予測を1年ずつずらしながら実施する
- (2011,2012で学習し2013を予測、2012,2013で学習し2014を予測、...)
ここではデータは学習・評価用とも事前に作成しBigQueryに入れてあり、ハイパーパラメータは決定済みで大量の予測モデルをまとめて評価したいケースを想定しています。年をずらして予測しているのは学習モデルの継時劣化対応のため毎年再学習をするという想定です。
疑似的に以下のようなクエリで取得するデータを例に説明を進めます。
SELECT year,date,shop_id,sales,attr1,attr2,attr3
FROM dataset.table
shop_idが店舗のユニークキーでsalesが目的変数、attr1-3が属性という想定です。
オプション
以下Pipelineの設定項目を入れていきます。
import apache_beam as beam
import apache_beam.transforms.window as window
options = beam.utils.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
google_cloud_options.project = '{YOUR_PROJECT}'
google_cloud_options.job_name = 'sklearn'
google_cloud_options.staging_location = 'gs://{YOUR_BUCKET}/binaries'
google_cloud_options.temp_location = 'gs://{YOUR_BUCKET}/temp'
worker_options = options.view_as(beam.utils.pipeline_options.WorkerOptions)
worker_options.max_num_workers = 10
#options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DirectRunner'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DataflowRunner'
pipeline = beam.Pipeline(options=options)
GoogleCloudOptionsではGCP上で動かすための設定を記載していきます。staging_locationやtemp_locationなどで実行ファイルや一時ファイル置き場を指定します。
WorkerOptionsではワーカの設定をしています。デフォルトでは負荷に応じてGCPが自動的に構成を決定してくれます。
(日本語版ドキュメントではPython未対応と書かれていますが英語版では対応しているとの記載)
オートスケールOnの時でも最大ワーカ数をmax_num_workerで指定してスケールを制限することができます。
StandardOptionsでパイプラインを動かす環境を指定しています。
DirectRunnerを指定すると手元の環境で動き、DataflowRunnerにするとGCP上で動きます。
小さいワークロードを手元で動作確認して問題なければクラウド上で動かすとよさそうです。
オプション設定は他にもいろいろあり、コマンドラインのヘルプやソースのコメントで確認できます。
パイプライン定義
パイプラインは各処理をパイプ演算子で順繰りにつなげていく形で記述していきます。
(pipeline
| "Query data" >> beam.Read(beam.io.BigQuerySource(query=query))
| "Assign time" >> beam.Map(assign_timevalue)
| "Set window" >> beam.WindowInto(window.SlidingWindows(size=3, period=1))
| "Assign group key" >> beam.Map(lambda v: (v["shop_id"], v))
| "Group by group key and time window" >> beam.GroupByKey()
| "Learn and predict" >> beam.FlatMap(learn_predict)
| "Write predict data" >> beam.Write(beam.io.BigQuerySink('dataset.table',
schema="shop_id:STRING, date:STRING, predict:FLOAT, sales:INTEGER",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)))
pipeline.run()
1つめのstepでBigQueryからクエリを指定してデータを読み込んでいます。
2つめのstepで次のstepのウィンドウの範囲を決める基準となる値を指定しています。今回は年単位でデータを区切るので各データの年を指すカラムを指定します(内容は後述)
3つめのstepでウィンドウの幅と間隔を指定しています。今回は幅3年(2年学習1年予測)で1年ずつずらすのでsize=3,period=1とします。SlidingWindowsはスライド用のウィンドウですが固定用のFixedWindowsやセッション用のSessionsなど他にもいろいろあります。
4つめのstepでグループ指定したいKeyとして店舗IDを指定しています
5つめのstepで先に指定したウィンドウとkey(店舗)ごとにグループ化されます。
6つめのstepはグループ化されたデータごとに学習・予測処理を行い、予測結果を返しています。FlatMapとしているのは店舗xウィンドウ単位で集約されたデータを日別にばらし直して返しているためです。
7つめのstepで日別の予測結果をBigQueryに保存しています。
最後の段のpipelineをrunされたタイミングでパイプラインが実行されます。
次に各関数の中について見ていきます。
def assign_timevalue(v):
import apache_beam.transforms.window as window
return window.TimestampedValue(v, v["year"])
ウィンドウで使う値を指定するにはを値をTimestampedValueに置き換えやります。TimestampedValueの1つめには値を、2つめにはウィンドウで使う値を指定しています。ここで注意が必要なのは関数内でパッケージやモジュールを参照するためにimportを指定する必要があることです。手元で動かす分には問題ないのですがクラウド上ではこの関数はワーカノードで分散されて実行されます。ワーカノードのさらの環境でも動くようにパッケージをimportしてやる必要があります。グローバルに定義した変数などもクラウド上ではアクセスできないので要注意です。
def learn_predict(records):
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
target_attr = 'sales'
learn_attrs = ['attr1', 'attr2', 'attr3']
data = pd.DataFrame(records[1])
data[learn_attrs] = data[learn_attrs].apply(pd.to_numeric)
data = data.fillna(0.)
if len(data["year"].unique()) < 3:
return [] # 3年に満たない組み合わせは何もしない
year_max = data["year"].max()
train = data[data["year"] < year_max] # 前2年は学習用
test = data[data["year"] == year_max] # 後1年は予測評価用
model = GradientBoostingRegressor()
model.fit(train[learn_attrs], train[target_attr])
test.loc[:, "predict"] = model.predict(test[learn_attrs])
return test[["shop_id","date","predict","sales"]].to_dict(orient='records')
学習・予測の関数にはデータはキー(店舗ID)と辞書形式のリストをバリューとするタプルで渡ってくるので、値のリストをDataframeに変換して学習・予測を行います。最後の行では後段のBigQueryのinsertに結果を辞書形式のリストで渡すための変換をしています。
こうしてパイプライン実行するとBigQueryに予測結果と正解データが入るのでSQLで店舗や年ごとなどいろいろな切り口でRMSEなどの指標を算出して評価を行ったりできます。
さいごに
Dataflow上で学習処理まで走らせるのはひょっとするとサービスの用途からは邪道かもしれないけど動かすことができました。今回はBigQueryに作成済みのデータから学習・予測して結果を保存する一本道の単純なパイプラインでしたが、データの加工などを追加したり、評価用データを別のフローから渡したり、結果を予測結果と予測モデルと分岐して後段に出力したりと柔軟に変えることもできるみたいです。今回ハイパーパラメータは決定済みとの想定でしたがパラメータチューニングの大量並列実行なども試してみたいと思います。
Cloud DataflowはGCPの中でもまだあまり注目されていないサービスですが、個人的には機械学習などの複雑なデータ処理を扱うアプリケーションでは面倒になりがちなデータフローの構築や運用をフルマネージドでやってくれるDataflowは、データ分析アプリケーションのAppEngineのような存在になるのではと期待しています。
今回はDataflowに標準でインストールされているscikit-learnを使いましたが実際にはいろいろなライブラリを使いたくなると思います。次回は任意のライブラリをインストールする手順についてxgboostのインストールを例に記載してみたいと思います。
参考
-
公式ドキュメント
-
中井先生のわかりやすいGoogleのデータ処理技術3部作
-
関連コード