はじめに
- Kedroという機械学習向けパイプラインツールを使ってみたので備忘までに記事を書きます。
- とりあえずパイプラインを組んでみて動いたところまで、です。もう少し使い込んで、別途投稿できればと思っています。
- 公式のチュートリアルの差分は、以下くらいかなと思います。(どっちもちょっとしたことですが)
- ホストを汚さないようにDockerコンテナ内でKedroプロジェクトを作成・開発していること
- node.pyは使わず、普段慣れ親しんだディレクトリ構成で普通にスクリプトを書いていること(それらをノードとしてパイプラインでつないでいる)
 
パイプラインツールの必要性
- 依存関係が複雑になりがちな処理処理フローを管理したい
- データ取り込み→データ前処理→モデルのトレーニング→チューニング→デプロイメント、etc
 
- ジョブを並列実行したい
- コンポーネントごとにマシンスペックを柔軟に設定したい
- 前処理は高CPU、学習はGPU、など
 
- リトライやキャッシュの実装を容易にしたい
- メタデータ・データリネージの管理をしたい
- 繰り返しの実行・継続的な改善をしつつ、データ・モデルの再現性を確保したい
 
- 実装を共通化したい。インターフェイスを統一したい
- 実験環境から本番環境への移行を容易にしたい
- コンポーネントごとに独立して開発・改修したい
- チーム間の連携や引き継ぎを容易にしたい
 
パイプラインツールの比較
Kedroとは
* The centre of your data pipeline
* Kedro is a development workflow framework that implements software engineering best-practice for data pipelines with an eye towards productionising machine learning models. We provide a standard approach so that you can:
  * Worry less about how to write production-ready code,
  * Spend more time building data pipelines that are robust, scalable, deployable, reproducible and versioned,
  & And, standardise the way that your team collaborates across your project.
Kedroを使った理由
- Pythonにより、DAG のような形式でパイプラインを定義できる
- yamlによるデータカタログの定義ができる
- csv, pickle, feather, parquet, DB上のテーブル など様々なデータ形式に対応
- 複数のデータ形式や Pandas/Spark の切り替えなどにコードの改変が不要
- -> データカタログとパイプラインを始めに定義しておくことで、チーム開発がやりやすくなる
 
- データが扱いやすい
- データ・中間オブジェクトの管理が可能
- データのキャッシュ機能を活用できる
 
- Cookiecutterによるプロジェクトテンプレートを提供している
- Jupyter Notebook, Jupyter Labとのインテグレーションが整っている
- パラメータ管理がしやすい
- parameters.ymlに記述しておけばstringで簡単に読み出せる
 
- Kedro vizによるパイプラインの可視化ができる ...etc
Kedroの仕組み概要
- 
- プロジェクトのrootディレクトリでkedro runを実行すると、親パイプラインが起動される
 
$ kedro run
- 
- 親パイプラインが実行されて、子パイプラインが起動される
 
src/tutorial/pipeline.py
from tutorial.pipelines import data_engineering as de
from tutorial.pipelines import data_science as ds
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
    data_engineering_pipeline = de.create_pipeline()
    data_science_pipeline = ds.create_pipeline()
- 
- 子パイプラインが実行されて、ノード(スクリプト)が起動される
 
src/tutorial/pipelines/data_science/pipeline.py
from .nodes import report_accuracy, train_model
def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=train_model,
                inputs=["example_train_x", "example_train_y", "example_test_x", "example_test_y", "parameters"],
                outputs="example_model",
            ),
            node(func=report_accuracy, inputs=["example_model", "example_test_x", "example_test_y"], outputs=None),
        ]
    )
- 
- ノード(スクリプト)が実行される
 
node.py
def train_model(example_train_x, example_train_y, example_test_x, example_test_y, parameters):
  print('test')
  return example_model
Kedroを利用した開発手順
「はじめに」記載の通り、一部我流です。
1. 環境構築
- Gitリポジトリ作成、コンテナ起動、Kedroセットアップ
$ git clone https://github.com/<user>/<project>.git
$ git checkout -b feature-1
$ git branch -a
$ mkdir -p docker/python382
$ cd docker/python382
$ vim Dockerfile
FROM python:3.8.2
ENV GIT_SSL_NO_VERIFY=1
# apt install
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
         curl \
         git \
         unzip \
         vim \
         wget
# pip install
COPY requirements.txt /tmp/
RUN pip install --upgrade pip && \
    pip install -r /tmp/requirements.txt
# cd
WORKDIR /opt/
# expose port for jupyter notebook
EXPOSE 8888
# expose port for kedro viz
EXPOSE 4141
# exec CMD
CMD kedro run
$ vim requirements.txt
jupyter
kedro
kedro-viz
$ vim run.sh
#!/bin/bash
# usage
SCRIPT_FILE=`basename $0`
function usage()
{
  echo "usage: ${SCRIPT_FILE} docker/nvidia-docker" 1>&2
}
# arguments
if [ $# -ne 1 ]; then
  usage
  exit 1
fi
DOCKER_CMD=$1
# constant
CONTAINER_NAME=<project>
CONTAINER_IMAGE=<project>
HOST_PORT_IPYNB=28888
CONTAINER_PORT_IPYNB=8888
HOST_PORT_KV=24141
CONTAINER_PORT_KV=4141
# main
${DOCKER_CMD} run --name ${CONTAINER_NAME} \
  --privileged \
  --entrypoint bash \
  -v $(pwd)/../../:/opt/<project>/ \
  -p ${HOST_PORT_IPYNB}:${CONTAINER_PORT_IPYNB} \
  -p ${HOST_PORT_KV}:${CONTAINER_PORT_KV} \
  -it ${CONTAINER_IMAGE}
$ docker build -t <project> ./
$ bash run.sh docker
$ cd /opt
$ kedro new
# <project_kedro>(一時的なディレクトリに作成)
# kedro newで作成したファイルを、GitHubのソース側に移動
$ cp -r <project_kedro>/. <project>/
# ディレクトリ変更(好みで)
mkdir src/<pj>/data
mkdir src/<pj>/features
mkdir src/<pj>/models
mkdir src/<pj>/visualization
mkdir src/<pj>/utils
$ git add
$ git commit -m 'set up docker and kedro refs #1'
$ git push -u origin feature-1
ディレクトリ構成例
.
├── README.md
├── conf
│   ├── README.md
│   ├── base
│   │   ├── catalog.yml
│   │   ├── logging.yml
│   │   └── parameters.yml
│   └── local
├── data
│   ├── 01_raw
│   ├── 02_intermediate
│   ├── 03_primary
│   ├── 04_feature
│   ├── 05_model_input
│   ├── 06_models
│   ├── 07_model_output
│   └── 08_reporting
├── docker
│   └── pytorch_1_4
│       ├── Dockerfile
│       ├── requirements.txt
│       └── run.sh
├── docs
│   └── source
│       ├── conf.py
│       └── index.rst
├── kedro_cli.py
├── logs
│   └── journals
├── notebooks
│   └── forecast_keiba
│       ├── data
│       ├── features
│       ├── models
│       └── visualization
├── setup.cfg
└── src
    ├── forecast_keiba
    │   ├── __init__.py
    │   ├── data
    │   ├── features
    │   ├── models
    │   ├── pipeline.py
    │   ├── pipelines
    │   │   └── __init__.py
    │   ├── run.py
    │   └── visualization
    ├── requirements.txt
    ├── setup.py
    └── tests
        ├── __init__.py
        ├── pipelines
        │   └── __init__.py
        └── test_run.py
2. [option]試行錯誤
- notebooks配下で試行錯誤
- 参考にするソースとかがあれば、ディレクトリ構成を変更して動くかくらいまで何となく確認しておく+catalogとpipelineのあたりをつけておく
 
3. Input/Outputデータ定義
- catalog定義
- (参考)Kedroユーザーガイド
 
conf/base/catalog.yml
test_score:
  type: pandas.CSVDataSet
  filepath: data/01_raw/test_score.csv
race_results_df:
  type: pickle.PickleDataSet
  filepath: data/01_raw/race_results_df.pickle
  backend: pickl
4. パイプライン定義
親パイプライン定義
- 親パイラプライン定義
src/forecast_keiba/pipeline.py
from typing import Dict
from kedro.pipeline import Pipeline
from forecast_keiba.pipelines import netkeiba_base_lr
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
    """Create the project's pipeline.
    Args:
        kwargs: Ignore any additional arguments added in the future.
    Returns:
        A mapping from a pipeline name to a ``Pipeline`` object.
    """
    netkeiba_base_lr_pipeline = netkeiba_base_lr.create_pipeline()
    return {
        "netkeiba_base_lr": netkeiba_base_lr_pipeline,
        "__default__": netkeiba_base_lr_pipeline
    }
子パイプライン定義
- パッケージとして読み込むために、init.pyの更新も忘れない
src/forecast_keiba/pipelines/__init__.py
from .pipeline import create_pipeline
- 子パイプライン定義
src/forecast_keiba/pipelines/pipeline.py
from kedro.pipeline import Pipeline, node
import sys
import os
FILE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(FILE_DIR + "/../../")
from data import scraping_netkeiba
from features import preprocess_race_results_base
from models import train_lr
from models import predict_lr
def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=scraping_netkeiba.scraping_netkeiba,
                inputs=[],
                outputs="race_results_df"
            ),
            node(
                func=preprocess_race_results_base.preprocess_race_results_base,
                inputs=["race_results_df"],
                outputs="race_results_df_processed_base"
            ),
            node(
                func=train_lr.train_lr,
                inputs=["race_results_df_processed_base"],
                outputs="model_lr"
            ),
            node(
                func=predict_lr.predict_lr,
                inputs=["model_lr", "parameters"],
                outputs=None
            ),
        ]
    )
4. ノード実装
- まず空で作っておく
- Inが引数で、Outがreturnで流れていくように
- -> パイプラインが通しで実行できるかどうかを確認しておく
 
- ノードの中身を実装していく
src/forecast_keiba/models/predict_lr.py
def predict_lr(model_lr, parameters):
    predict_race_id = parameters["predict_race_id"]
    # outputsがある場合は、returnする
def main(model_lr, parameters):
    return predict_lr(model_lr, parameters)
if __name__ == "__main__":
    main(model_lr, parameters)
5. パイプライン実行
- パイプライン実行
$ kedro run
[option]kedro-vizによる可視化
- kedro-vizの起動(デフォルトはポート4141)
$ kedro viz --host 0.0.0.0
 
(おまけ)触ってみて得たちょっとした知見
- リソースはDBテーブルも指定できる
- SQL発行をできる
 
- kedro runで実行されるrunは、kedro_cli.pyで定義されている
- ノートブックからもconfigにアクセスすることができる
- srcは、pipelineのin/outで書く(から、不要)が、jupyterは自分で最初に書く(catalogライブラリ自体のimportはいらない)
 
$ kedro jupyter notebook --allow-root --port=8888 --ip=0.0.0.0 &
df_test_score = catalog.load("test_score")
df_test_score_aggregated = df_test_score.groupby('class').mean()
catalog.save("test_score_aggregated", df_test_score_aggregated)
from kedro.framework.context import load_context
proj_path = '../../../' 
context = load_context(proj_path)
# df = catalog.load("XXX")
parameters = context.params
(おまけ)気になっていること
- 画像のcatalog定義
- ディレクトリ指定
 
- キャッシュが効く/効かない条件
- MLflowとの連携
- papermillを使ってjupyter notebookでバッチ実行
- パイプラインが落ちた地点からの再実行
- 並列実行
- confの環境ごとの使い分け
- kedro lint, kedro journal, context


