概要
TL;DR
単発のデータ分析案件を想定し、1つのjupyter notebookファイル内で以下の工程を完結させます。
- データをローカルで前処理して Google Cloud Storage にデータを格納する
- 機械学習コードを書いて、Cloud ML Engine で実行する
- Cloud Storage に保存されたモデルをローカルにダウンロードして予測を実行する
デモとして有名なkaggleのtitanicの生存者予測をやっています。
背景
自分は普段、データ分析業務の中で機械学習モデルの作成を行うことが多いのですが、
- 分析元となるデータは機密情報だが、あまりローカルの端末で持ちたくない。
- ローカルのPCでGPUが使えず学習が遅いので早くしたい。
という課題がありました。
これを、GCPの諸々のツールを使って解決しようというのが今回の記事の内容です。
また、分析環境としてjupyter notebookを使っています。普通のエディタとnotebookとを行ったり来たりするのは面倒なので、上記の内容を1つのnotebookファイル内で行うことでシンプルで効率的な分析を実現させる狙いです。
想定読者
- データ分析業務で機械学習を行っているがローカルマシンで学習をしており、マシンパワーを上げて短時間でモデル作成したい人
- ML Engine とか聞いたことあるけど、実際どんな風に学習ができるのか知りたい人
- データ分析でjupyter notebookを使っている人
- どちらかというとエンジニアではなくデータサイエンティスト向けですね
ツールの構成
- データのダウンロードから前処理まで: ローカル端末
- 前処理済みデータの格納先: Cloud Storage
- モデルのトレーニグ: ML Engine
- モデルを用いた予測: ローカル端末
というシンプルな構成でいきます。
なぜこのような構成にしたか
データ分析では変数を変えたりパラメータを弄ったりとトライアンドエラーが多いですが、このとき学習時間が遅いのがボトルネックとなり試行回数が減ってしまいがちです。しかしそれ以外の前処理や予測はローカルで実行したほうが早いです(自分の今の環境では)。そこで、モデルは最終的にローカルに置きつつ、学習をML Engineで実行することで高速なデータ分析実行を目的とする構成です。
※ とはいえ、いずれはDataLabなどを使って分析業務自体もクラウド化を進めたいと思っていますが、それはまたいずれ。
今回のコード
自分のGitHubリポジトリに置いてあります。よければ参考にしてみてください。
リポジトリ: https://github.com/kazuki-hayakawa/sklearn_mlengine_sample
デモ用notebook: https://github.com/kazuki-hayakawa/sklearn_mlengine_sample/blob/master/Cloud_ML_Engine_sklearn_sample.ipynb
デモ:titanicの生存者予測(from kaggle)
以下のデモの内容はGitHubにアップロードしたnotebookにまとめてあります。
1. 準備
1.1 Cloud Storage にバケットを作成する
データを格納するバケットを作成します。
同一プロジェクト内のバケット名の重複はエラーになるのでご注意ください。
また、事前にサービスアカウントのjsonファイルの発行や、環境変数の設定、およびCloud SDKのセットアップをして gcloud auth login
は済んでいる状況を想定しています。
# 何故か環境変数のPATHが通ってないので google api alient の json ファイルのパスを通しておく。 # 何故か環境
# この辺は個々人の環境設定によるかもしれません。
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'YOUR/PATH/SERVICE_ACCOUNT_KEY.json'
# 後述で subprocess を使って実行している箇所もあります。
# せっかくPythonライブラリもあるのでこっちも使ってみたかった。
from google.cloud import storage as gcs
from googleapiclient import errors
project_name = 'YOUR_PROJECT_NAME'
gcs_client = gcs.Client(project_name)
bucket_name = 'sklearn-sample'
# バケット作成
# すでに存在するバケット名の場合は作成エラーになるので回避
if bucket_name not in [b.name for b in gcs_client.list_buckets()]:
bucket = gcs_client.create_bucket(bucket_name)
print('Bucket {} created.'.format(bucket.name))
else:
print("You already own bucket {}.".format(bucket_name))
1.2 データセットをダウンロードして前処理してCloud Storageに格納する
kaggle からデータをダウンロードします。
jupyter上でシェルスクリプトが書けるマジックを使用します。
%%bash
DATA_DIR="${PWD}/dataset/"
kaggle competitions download -c titanic -p $DATA_DIR --force
前処理してデータをアップロードします。
import numpy as np
import pandas as pd
# アップロード用関数を作成する。サブディレクトリを指定して送信できるようひと工夫。
def file_upload(local_filepath, bucket, bucket_filename=None):
"""
bucket: 格納先バケットオブジェクト
bucket_filename: バケット先に設定したいパスを含んだファイル名
"""
if bucket_filename is None:
# 名前がないときはtmpディレクトリを作成してローカルのファイル名と同じで格納する
bucket_filename = 'tmp/' + os.path.basename(local_filepath)
blob = bucket.blob(bucket_filename)
blob.upload_from_filename(local_filepath)
# データ前処理を行う関数
def preprocessing(df):
#欠損値処理
df['Fare'] = df['Fare'].fillna(df['Fare'].median())
df['Age'] = df['Age'].fillna(df['Age'].median())
#カテゴリ変数の変換
df['Sex'] = df['Sex'].apply(lambda x: 1 if x == 'male' else 0)
df = df.drop(['Cabin','Name','PassengerId','Ticket','Embarked'],axis=1)
df = df.astype('float64')
return df
# データ前処理 # データ前処
train_data = './dataset/train.csv'
test_data = './dataset/test.csv'
df_train = pd.read_csv(train_data)
df_test = pd.read_csv(test_data)
df_train = preprocessing(df_train)
df_test = preprocessing(df_test)
# 前処理結果を上書きで保存
df_train.to_csv(train_data, index=False)
df_train.to_csv(test_data, index=False)
# データ送信
bucket = gcs_client.bucket(bucket_name)
file_upload(train_data, bucket, bucket_filename='dataset/train.csv')
file_upload(test_data, bucket, bucket_filename='dataset/test.csv')
2. モデルトレーニグの記述
2.1 jupyterマジックコマンドの設定
今回は特定のトレーニングコード部分のセルのみを.pyファイルに記述してパッケージ化してML Engine上に送信して動かす必要があります。
そのため、任意のセルのみを.pyファイルに記述できるマジックコマンドを自作しました。コードはこちらです。
使い方は以下のとおりです。
-
%%mlcodes
でコードを記述。run_local
と続けて記載するとローカルのjupyterでも同じコードを動かす。- 重たい学習で、ローカルで動かしたくないときは run_local を書かない
-
%code_to_pyfile
でコードを./trainer/task.py
に保存。これを行うまではコードは保存されない。 -
%clear_mlcode
でコードの初期化。%%mlcodes
のセルを叩くたびに append されるので、同じセルをたくさん叩いちゃったときに直す用。
jupyterで有効にするには以下を実行します。
# マジックを有効にする
from lib.mlcodemagic import MLCodeMagic
ip = get_ipython()
ip.register_magics(MLCodeMagic)
2.2 モデル記述
モデル部分は以下のようになります。
実際はセルに書きながら実行していますが、見やすくするためにひとまとめにします。実際の様子はnotebookファイルをご覧ください。
モデルはpickle形式で gs://sklearn-sample/models/model.pkl
に保存しておきます。
import os
import pickle
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.python.lib.io import file_io
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
bucket_path = "gs://sklearn-sample"
train_file_path = "{}/dataset/train.csv".format(bucket_path)
# get train data
# CloudML上からgoogle cloud storage上のファイルにアクセスする際には、tensorflowのfile_ioパッケージを使う必要がある
with file_io.FileIO(train_file_path,'r') as f:
df = pd.read_csv(f)
y = np.array(df['Survived'], dtype='int8')
del df['Survived']
X = np.array(df, dtype='float64')
# train valid split
# 今回はただfitしてるだけなのでtrain_test_splitは不要ですが、交差検証するとき等では必要ですね。
X_train, X_validation, y_train, y_validation = train_test_split(
X, y, test_size=0.2, random_state=0)
# make model
clf = RandomForestClassifier()
clf.fit(X_train, y_train)
# save model
model_name = 'model.pkl'
with file_io.FileIO(os.path.join(bucket_path,'model',model_name), 'wb') as model_file:
pickle.dump(clf, model_file)
2.3 学習の実行
トレーニング部分のコードが出来上がり、ローカルの ./trainer/task.py
として保存されたら、ML Engineにジョブを投げて学習を実行させます。
import os
from datetime import datetime
import subprocess
# ジョブ定義
# ジョブの名前は何でもいいです。 今回は job_[日付] にしました。
bucket_path = "gs://sklearn-sample"
job_name = datetime.now().strftime("job_%Y%m%d_%H%M%S")
job_dir = os.path.join(bucket_path, job_name)
# runtime-version 1.4 以上でないと python 3.5 が使えないので注意
# scale-tier BASIC_GPU で GPU指定可能
job_cmd = """gcloud ml-engine jobs submit training {0}
--job-dir {1}
--module-name trainer.task
--package-path trainer
--staging-bucket {2}
--region us-central1
--runtime-version 1.6
--python-version 3.5
--scale-tier BASIC_GPU
""".format(job_name, job_dir, bucket_path)
# ジョブ実行
subprocess.run(job_cmd.split())
ジョブを投げたら学習が完了するまでコーヒーでも飲みながらしばらくお待ち下さい。
3. 学習済みモデルをダウンロードして予測を実行
cloud storage から学習済みモデルをダウンロードしてローカルで予測を実行します。
import pickle
# 学習済みモデルをローカルにダウンロード
model_dir = "./models"
if not os.path.exists(model_dir):
os.mkdir(model_dir)
bucket_path = "gs://sklearn-sample"
model_name = 'model.pkl'
gcs_model_path = os.path.join(bucket_path, 'model', model_name)
dl_cmd = "gsutil cp {0} {1}".format(gcs_model_path, model_dir)
subprocess.run(dl_cmd.split())
# モデルのロード
with open(os.path.join(model_dir, model_name),'rb') as f:
clf = pickle.load(f)
# 予測する
X_test = np.array(df_test, dtype='float64')
y_pred = clf.predict(X_test)
所感
インフラやクラウドの知識がなかったので良い勉強になりました。最初はチュートリアルのコードを解読するのが苦労しましたが、慣れれば意外と簡単に扱えて驚いています。
インフラエンジニアじゃなくてもクラウドで機械学習を実行する環境が作れるのはさすがGCPだなと。
単発の分析案件を想定しているので、前処理や予測の作成はローカルで行っていますが、プロダクトの運用に乗るようなモデルを開発するときは前処理はData Flowで、予測はOnline Predictionで実装できるようになると今後も幅が広がりそうなのでチャレンジしてみたいです。
GCPのML Engineは記事も少ないので、この記事が似たような悩みを持っている人の助けになれば幸いです。