notebookで全部やるのはもう限界なので「機械学習 パイプライン」でググってなにか見つけたKedroを試してみます。
調べる
https://qiita.com/Minyus86/items/70622a1502b92ac6b29c
これを見てGokartかPipelineXかなと思う。
Gokart https://speakerdeck.com/vaaaaanquish/gokartfalseyun-yong-toke-ti-nituite エムスリーだって。
PipelineXは更新されてないな。
Kedroはドキュメントがいいカンジだな。
https://cyberagent.ai/blog/research/12898/
「Neptune.ai, Comet.ml, Weights & Biases それぞれMLスタートアップの提供するSaaSの実験管理ツールで、Python APIを通じて実験ログをSaaSに流し、Web UI上で実験の可視化や共有ができます。」
Kedroとこのへんつないだらお手軽でいいんじゃないかな。
とりあえずnotebookのカオスをどうにかしたい。ので簡単そうなこれでいってみたい。
ドキュメントがとても丁寧なので順番にやっていけばできそうな気がします。
https://kedro.readthedocs.io/en/stable/index.html
チュートリアル以前
インストール
pipで。
Hello Kedro!
ふつうはkedro newとかkedro installとかしてrails newみたいにプロジェクト作って、ディレクトリ構成がばーって出来て・・・っていうフレームワークなんだけど、1ファイルでkedroの雰囲気を説明しているやつがこれ。
"""Contents of hello_kedro.py"""
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, Pipeline
from kedro.runner import SequentialRunner
# Prepare a data catalog
data_catalog = DataCatalog({"example_data": MemoryDataSet()})
# Prepare first node
def return_greeting():
return "Hello"
return_greeting_node = node(
return_greeting, inputs=None, outputs="my_salutation"
)
# Prepare second node
def join_statements(greeting):
return f"{greeting} Kedro!"
join_statements_node = node(
join_statements, inputs="my_salutation", outputs="my_message"
)
# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])
# Create a runner to run the pipeline
runner = SequentialRunner()
# Run the pipeline
print(runner.run(pipeline, data_catalog))
- タスク(return_greeting,join_statements)をpythonの関数として実装。
- nodeとしてタスクをwrapしてパラメータや処理順をゴニョゴニョとカスタムしやすく
- DataCatalogっていうところにデータのありかを登録しておく
- runnerってのが全部まとめていいカンジに実行
Iris
kedro newしてプロジェクトをつくるときに最後にYってするとIrisのデータと前処理&学習&推論実行が例として出力されるのでこれでKedroプロジェクトのディレクトリ構成、ファイル構成の雰囲気がわかる。あれだ、scaffold。
02-iris/get-startedがプロジェクトのルートディレクトリ。
前処理のコード
02-iris/get-started/src/get_started/pipelines/data_engineering/nodes.py
from typing import Any, Dict
import pandas as pd
def split_data(data: pd.DataFrame, example_test_data_ratio: float) -> Dict[str, Any]:
"""Node for splitting the classical Iris data set into training and test
sets, each split into features and labels.
The split ratio parameter is taken from conf/project/parameters.yml.
The data and the parameters will be loaded and provided to your function
automatically when the pipeline is executed and it is time to run this node.
"""
data.columns = [
"sepal_length",
"sepal_width",
"petal_length",
"petal_width",
"target",
]
classes = sorted(data["target"].unique())
# One-hot encoding for the target variable
data = pd.get_dummies(data, columns=["target"], prefix="", prefix_sep="")
# Shuffle all the data
data = data.sample(frac=1).reset_index(drop=True)
# Split to training and testing data
n = data.shape[0]
n_test = int(n * example_test_data_ratio)
training_data = data.iloc[n_test:, :].reset_index(drop=True)
test_data = data.iloc[:n_test, :].reset_index(drop=True)
# Split the data to features and labels
train_data_x = training_data.loc[:, "sepal_length":"petal_width"]
train_data_y = training_data[classes]
test_data_x = test_data.loc[:, "sepal_length":"petal_width"]
test_data_y = test_data[classes]
# When returning many variables, it is a good practice to give them names:
return dict(
train_x=train_data_x,
train_y=train_data_y,
test_x=test_data_x,
test_y=test_data_y,
)
前処理のコードをラップ
02-iris/get-started/src/get_started/pipelines/data_engineering/pipeline.py
ラップするのは「パラメータ、入力、出力」を定義するため。
from kedro.pipeline import Pipeline, node
from .nodes import split_data
def create_pipeline(**kwargs):
return Pipeline(
[
node(
split_data,
["example_iris_data", "params:example_test_data_ratio"],
dict(
train_x="example_train_x",
train_y="example_train_y",
test_x="example_test_x",
test_y="example_test_y",
),
)
]
)
学習&推論&結果出力
02-iris/get-started/src/get_started/pipelines/data_science/nodes.py
numpyで、、、これは、、、シグモイド関数。。。最急降下法?ニュートン法?なんだっけ?ともかくscikit-learnとかlightgbmとかじゃなくて自前でやってる。
import logging
from typing import Any, Dict
import numpy as np
import pandas as pd
def train_model(
train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
) -> np.ndarray:
"""Node for training a simple multi-class logistic regression model. The
number of training iterations as well as the learning rate are taken from
conf/project/parameters.yml. All of the data as well as the parameters
will be provided to this function at the time of execution.
"""
num_iter = parameters["example_num_train_iter"]
lr = parameters["example_learning_rate"]
X = train_x.to_numpy()
Y = train_y.to_numpy()
# Add bias to the features
bias = np.ones((X.shape[0], 1))
X = np.concatenate((bias, X), axis=1)
weights = []
# Train one model for each class in Y
for k in range(Y.shape[1]):
# Initialise weights
theta = np.zeros(X.shape[1])
y = Y[:, k]
for _ in range(num_iter):
z = np.dot(X, theta)
h = _sigmoid(z)
gradient = np.dot(X.T, (h - y)) / y.size
theta -= lr * gradient
# Save the weights for each model
weights.append(theta)
# Return a joint multi-class model with weights for all classes
return np.vstack(weights).transpose()
def predict(model: np.ndarray, test_x: pd.DataFrame) -> np.ndarray:
"""Node for making predictions given a pre-trained model and a test set.
"""
X = test_x.to_numpy()
# Add bias to the features
bias = np.ones((X.shape[0], 1))
X = np.concatenate((bias, X), axis=1)
# Predict "probabilities" for each class
result = _sigmoid(np.dot(X, model))
# Return the index of the class with max probability for all samples
return np.argmax(result, axis=1)
def report_accuracy(predictions: np.ndarray, test_y: pd.DataFrame) -> None:
"""Node for reporting the accuracy of the predictions performed by the
previous node. Notice that this function has no outputs, except logging.
"""
# Get true class index
target = np.argmax(test_y.to_numpy(), axis=1)
# Calculate accuracy of predictions
accuracy = np.sum(predictions == target) / target.shape[0]
# Log the accuracy of the model
log = logging.getLogger(__name__)
log.info("Model accuracy on test set: %0.2f%%", accuracy * 100)
def _sigmoid(z):
"""A helper sigmoid function used by the training and the scoring nodes."""
return 1 / (1 + np.exp(-z))
学習&推論&結果出力をラップ
02-iris/get-started/src/get_started/pipelines/data_science/pipeline.py
入力と出力をつないでいる。
from kedro.pipeline import Pipeline, node
from .nodes import predict, report_accuracy, train_model
def create_pipeline(**kwargs):
return Pipeline(
[
node(
train_model,
["example_train_x", "example_train_y", "parameters"],
"example_model",
),
node(
predict,
dict(model="example_model", test_x="example_test_x"),
"example_predictions",
),
node(report_accuracy, ["example_predictions", "example_test_y"], None),
]
)
元データの設定ファイル
02-iris/get-started/conf/base/catalog.yml
example_iris_data:
type: pandas.CSVDataSet
filepath: data/01_raw/iris.csv
パラメータ設定ファイル
02-iris/get-started/conf/base/parameters.yml
example_test_data_ratio: 0.2
example_num_train_iter: 10000
example_learning_rate: 0.01
notebook
02-iris/get-started/notebooks
というディレクトリもあるのでここでフレームワークに組み込んだnodeを呼んでnotebookで実験しつつ、まとまったらnode化して・・・とできるんだと思われる。
なるほど、これは整理できそう!
(続く)