1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

notebookがもうぐちゃぐちゃなんだけど機械学習パイプラインって何→Kedroってどう?

Last updated at Posted at 2020-09-30

notebookで全部やるのはもう限界なので「機械学習 パイプライン」でググってなにか見つけたKedroを試してみます。

調べる

https://qiita.com/Minyus86/items/70622a1502b92ac6b29c
これを見てGokartかPipelineXかなと思う。

Gokart https://speakerdeck.com/vaaaaanquish/gokartfalseyun-yong-toke-ti-nituite エムスリーだって。
PipelineXは更新されてないな。
Kedroはドキュメントがいいカンジだな。

https://cyberagent.ai/blog/research/12898/
「Neptune.ai, Comet.ml, Weights & Biases それぞれMLスタートアップの提供するSaaSの実験管理ツールで、Python APIを通じて実験ログをSaaSに流し、Web UI上で実験の可視化や共有ができます。」

Kedroとこのへんつないだらお手軽でいいんじゃないかな。
とりあえずnotebookのカオスをどうにかしたい。ので簡単そうなこれでいってみたい。

ドキュメントがとても丁寧なので順番にやっていけばできそうな気がします。
https://kedro.readthedocs.io/en/stable/index.html

チュートリアル以前

インストール

pipで。

Hello Kedro!

ふつうはkedro newとかkedro installとかしてrails newみたいにプロジェクト作って、ディレクトリ構成がばーって出来て・・・っていうフレームワークなんだけど、1ファイルでkedroの雰囲気を説明しているやつがこれ。

"""Contents of hello_kedro.py"""
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, Pipeline
from kedro.runner import SequentialRunner

# Prepare a data catalog
data_catalog = DataCatalog({"example_data": MemoryDataSet()})

# Prepare first node
def return_greeting():
    return "Hello"


return_greeting_node = node(
    return_greeting, inputs=None, outputs="my_salutation"
)

# Prepare second node
def join_statements(greeting):
    return f"{greeting} Kedro!"


join_statements_node = node(
    join_statements, inputs="my_salutation", outputs="my_message"
)

# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])

# Create a runner to run the pipeline
runner = SequentialRunner()

# Run the pipeline
print(runner.run(pipeline, data_catalog))
  • タスク(return_greeting,join_statements)をpythonの関数として実装。
  • nodeとしてタスクをwrapしてパラメータや処理順をゴニョゴニョとカスタムしやすく
  • DataCatalogっていうところにデータのありかを登録しておく
  • runnerってのが全部まとめていいカンジに実行

Iris

kedro newしてプロジェクトをつくるときに最後にYってするとIrisのデータと前処理&学習&推論実行が例として出力されるのでこれでKedroプロジェクトのディレクトリ構成、ファイル構成の雰囲気がわかる。あれだ、scaffold。

02-iris/get-startedがプロジェクトのルートディレクトリ。

前処理のコード

02-iris/get-started/src/get_started/pipelines/data_engineering/nodes.py

from typing import Any, Dict

import pandas as pd


def split_data(data: pd.DataFrame, example_test_data_ratio: float) -> Dict[str, Any]:
    """Node for splitting the classical Iris data set into training and test
    sets, each split into features and labels.
    The split ratio parameter is taken from conf/project/parameters.yml.
    The data and the parameters will be loaded and provided to your function
    automatically when the pipeline is executed and it is time to run this node.
    """
    data.columns = [
        "sepal_length",
        "sepal_width",
        "petal_length",
        "petal_width",
        "target",
    ]
    classes = sorted(data["target"].unique())
    # One-hot encoding for the target variable
    data = pd.get_dummies(data, columns=["target"], prefix="", prefix_sep="")

    # Shuffle all the data
    data = data.sample(frac=1).reset_index(drop=True)

    # Split to training and testing data
    n = data.shape[0]
    n_test = int(n * example_test_data_ratio)
    training_data = data.iloc[n_test:, :].reset_index(drop=True)
    test_data = data.iloc[:n_test, :].reset_index(drop=True)

    # Split the data to features and labels
    train_data_x = training_data.loc[:, "sepal_length":"petal_width"]
    train_data_y = training_data[classes]
    test_data_x = test_data.loc[:, "sepal_length":"petal_width"]
    test_data_y = test_data[classes]

    # When returning many variables, it is a good practice to give them names:
    return dict(
        train_x=train_data_x,
        train_y=train_data_y,
        test_x=test_data_x,
        test_y=test_data_y,
    )

前処理のコードをラップ

02-iris/get-started/src/get_started/pipelines/data_engineering/pipeline.py

ラップするのは「パラメータ、入力、出力」を定義するため。


from kedro.pipeline import Pipeline, node

from .nodes import split_data


def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                split_data,
                ["example_iris_data", "params:example_test_data_ratio"],
                dict(
                    train_x="example_train_x",
                    train_y="example_train_y",
                    test_x="example_test_x",
                    test_y="example_test_y",
                ),
            )
        ]
    )

学習&推論&結果出力

02-iris/get-started/src/get_started/pipelines/data_science/nodes.py

numpyで、、、これは、、、シグモイド関数。。。最急降下法?ニュートン法?なんだっけ?ともかくscikit-learnとかlightgbmとかじゃなくて自前でやってる。


import logging
from typing import Any, Dict

import numpy as np
import pandas as pd


def train_model(
    train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any]
) -> np.ndarray:
    """Node for training a simple multi-class logistic regression model. The
    number of training iterations as well as the learning rate are taken from
    conf/project/parameters.yml. All of the data as well as the parameters
    will be provided to this function at the time of execution.
    """
    num_iter = parameters["example_num_train_iter"]
    lr = parameters["example_learning_rate"]
    X = train_x.to_numpy()
    Y = train_y.to_numpy()

    # Add bias to the features
    bias = np.ones((X.shape[0], 1))
    X = np.concatenate((bias, X), axis=1)

    weights = []
    # Train one model for each class in Y
    for k in range(Y.shape[1]):
        # Initialise weights
        theta = np.zeros(X.shape[1])
        y = Y[:, k]
        for _ in range(num_iter):
            z = np.dot(X, theta)
            h = _sigmoid(z)
            gradient = np.dot(X.T, (h - y)) / y.size
            theta -= lr * gradient
        # Save the weights for each model
        weights.append(theta)

    # Return a joint multi-class model with weights for all classes
    return np.vstack(weights).transpose()


def predict(model: np.ndarray, test_x: pd.DataFrame) -> np.ndarray:
    """Node for making predictions given a pre-trained model and a test set.
    """
    X = test_x.to_numpy()

    # Add bias to the features
    bias = np.ones((X.shape[0], 1))
    X = np.concatenate((bias, X), axis=1)

    # Predict "probabilities" for each class
    result = _sigmoid(np.dot(X, model))

    # Return the index of the class with max probability for all samples
    return np.argmax(result, axis=1)


def report_accuracy(predictions: np.ndarray, test_y: pd.DataFrame) -> None:
    """Node for reporting the accuracy of the predictions performed by the
    previous node. Notice that this function has no outputs, except logging.
    """
    # Get true class index
    target = np.argmax(test_y.to_numpy(), axis=1)
    # Calculate accuracy of predictions
    accuracy = np.sum(predictions == target) / target.shape[0]
    # Log the accuracy of the model
    log = logging.getLogger(__name__)
    log.info("Model accuracy on test set: %0.2f%%", accuracy * 100)


def _sigmoid(z):
    """A helper sigmoid function used by the training and the scoring nodes."""
    return 1 / (1 + np.exp(-z))

学習&推論&結果出力をラップ

02-iris/get-started/src/get_started/pipelines/data_science/pipeline.py

入力と出力をつないでいる。

from kedro.pipeline import Pipeline, node

from .nodes import predict, report_accuracy, train_model


def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                train_model,
                ["example_train_x", "example_train_y", "parameters"],
                "example_model",
            ),
            node(
                predict,
                dict(model="example_model", test_x="example_test_x"),
                "example_predictions",
            ),
            node(report_accuracy, ["example_predictions", "example_test_y"], None),
        ]
    )

元データの設定ファイル

02-iris/get-started/conf/base/catalog.yml

example_iris_data:
  type: pandas.CSVDataSet
  filepath: data/01_raw/iris.csv

パラメータ設定ファイル

02-iris/get-started/conf/base/parameters.yml

example_test_data_ratio: 0.2
example_num_train_iter: 10000
example_learning_rate: 0.01

notebook

02-iris/get-started/notebooks

というディレクトリもあるのでここでフレームワークに組み込んだnodeを呼んでnotebookで実験しつつ、まとまったらnode化して・・・とできるんだと思われる。

なるほど、これは整理できそう!

(続く)

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?