St_Hakky’s blog

Data Science / Human Resources / Web Applicationについて書きます

【 Kedro】Kedroに入門したのでまとめる

こんにちは。

最近、Kedroと言う機械学習向けのパイプライン構築用のツールを使ってみたので、それについてまとめます。

Kedroとは?

f:id:St_Hakky:20201127110345p:plain

概要

Kedro は QuantumBlack というデータ分析企業 が公開している、プロダクションレディなデータ分析用ワークフロー構築ツールです。結構いろんなパイプラインツールがありますし、全部を触ったことがあるわけではないですが、今のところ*1Kedroはすごくすごく良い感じです。

ツールの紹介動画は以下*2

youtu.be

公式周りのサイトのURLは以下の通り。

特徴

めちゃめちゃたくさんの特徴がありますが、以下あたりはすごく便利です。

  • Airflowなどと同じく、Pythonで全てのワークフローを書くことができる
  • DAG形式でパイプラインを定義でき、 Sequentialな実行とParallelな実行の切り替えや、パイプラインの途中から実行するなどできる
  • yamlで定義することができるデータカタログの機能があり、csv, pickle, feather, parquet, DB上のテーブル など様々なデータ形式に対応することができる
  • データセットや学習モデルをバージョン管理し、指定のバージョンでいつでも実行できるよう再現性を担保する
  • Cookiecutter によるテンプレートを利用することで、複数人での作業を管理できる
  • モデルのパラメーターをyamlで管理することができる
  • Jupyter Notebook, Jupyter Lab とのインテグレーション
  • 本番環境への移行がしやすく、複数の環境(AWS, GCPのようなマネージドサービスなど)にデプロイすることができる。具体的にはPythonのパッケージとしてであったり、kubeflowやArgo Workflows、AWS Batchにデプロイなどもできる
  • リッチなパイプラインの可視化がKedro vizでできる

とりあえずやりたいことは一通りできそうな雰囲気を感じ取っていただけると思います笑

Kedroの大まかな構成要素

Kedroは、大まかには次の4つから構成されています。

  • Node
  • Pipeline
  • DataCatalog
  • Runner
Node
  • 実行される処理の単位で、前処理や学習といった処理本体になるもの
  • 入力、出力のデータセットと、それを処理するロジックを定義して、パイプラインに組み込む
Pipeline
  • Nodeの依存関係や実行順序を管理するもの。
  • decorator機能がkedroにはあり、これによって、パイプライン全体の処理に対して機能を付加することもできる
DataCatalog
  • パイプラインで使用するデータを定義するカタログ
  • データセット名、形式、ファイルパス、ロードやセーブ時のオプションなどを指定することが可能
Runner
  • パイプラインを実行するもの。パラメーターを指定して実行することができ、例えば特定のパイプラインだけ実行するとかもできる。
  • SequentialRunner、ParallelRunnerの二つのRunnerがある。

Install

普通にpipとかcondaでインストールできます。

# pipとかcondaでインストールできます
$ pip install kedro
$ conda install -c conda-forge kedro

 # installされたかの確認。kedroっていう文字が出てきたら成功
$ kedro info

 _            _
| | _____  __| |_ __ ___
| |/ / _ \/ _` | '__/ _ \
|   <  __/ (_| | | | (_) |
|_|\_\___|\__,_|_|  \___/
v0.16.6

kedro allows teams to create analytics
projects. It is developed as part of
the Kedro initiative at QuantumBlack.

No plugins installed

新規のプロジェクトを作成する

kedroのデフォルトのテンプレートを使う場合は、以下のコマンドでやります。いくつか質問が出てきますが、それぞれ読んで答えればオッケー。

$ kedro new

テンプレートをオンにすると、以下のような感じでフォルダが構成されます。

$ tree .

.
├── README.md
├── conf
│   ├── README.md
│   ├── base
│   │   ├── catalog.yml
│   │   ├── credentials.yml
│   │   ├── logging.yml
│   │   └── parameters.yml
│   └── local
├── data
│   ├── 01_raw
│   │   └── iris.csv
│   ├── 02_intermediate
│   ├── 03_primary
│   ├── 04_feature
│   ├── 05_model_input
│   ├── 06_models
│   ├── 07_model_output
│   └── 08_reporting
├── docs
│   └── source
│       ├── conf.py
│       └── index.rst
├── kedro_cli.py
├── logs
│   └── journals
├── notebooks
├── setup.cfg
└── src
    ├── requirements.txt
    ├── sample
    │   ├── __init__.py
    │   ├── hooks.py
    │   ├── pipelines
    │   │   ├── __init__.py
    │   │   ├── data_engineering
    │   │   │   ├── README.md
    │   │   │   ├── __init__.py
    │   │   │   ├── nodes.py
    │   │   │   └── pipeline.py
    │   │   └── data_science
    │   │       ├── README.md
    │   │       ├── __init__.py
    │   │       ├── nodes.py
    │   │       └── pipeline.py
    │   └── run.py
    ├── setup.py
    └── tests
        ├── __init__.py
        ├── pipelines
        │   └── __init__.py
        └── test_run.py

プロジェクトをgitの管理下にするには、以下のような感じでやります。

$ git init
$ git add ./
$ git commit -m "init"
$ git branch -M main
$ git remote add origin <hogehoge>
$ git push origin main

フォルダ構成からわかると思いますが、どこに何を書くかが明確で、サンプルのコードを追いかけるだけで、大体何すればいいかわかります笑*3

とりあえず動かしてみる

kedro newをしたときに、irisのデータセットでサンプルのモデルを動かすためのパイプラインやノード、データカタログが用意されているので、とりあえずこいつをローカルで動かしてみたいと思います。

$ cd <プロジェクトルート>

# まずはプロジェクトの依存関係をインストール
$ kedro install

# 実行
$ kedro run

これだけです。簡単ですね。実行すると、logのフォルダにログが吐き出されるのがわかります。

次からは、処理を実際に追加していくのをどうするか見ていきます。

データソースを追加する(Data Catalogを追加する)

Nodeなどでデータを実際に使うために、データカタログにデータソースを追加します。conf/base/catalog.ymlというファイルに記述していきます。

csvとかであれば、以下のような感じで追加できます。他にも、xlsxやparquet, sqlTableなど様々なデータソースに対応できます。

companies:
  type: pandas.CSVDataSet
  filepath: data/01_raw/companies.csv

reviews:
  type: pandas.CSVDataSet
  filepath: data/01_raw/reviews.csv


dataのディレクトリに、それぞれのデータの状態に合わせてフォルダにデータを入れます。ここら辺も人によってフォルダの分け方が別れることがほとんどですが、事前に定義がされているのでやりやすいです。


処理を追加する(Nodeを編集する)

nodeの処理は単純で、普通に関数を追加するだけです(完)

試しに、テンプレートで出てくるnodes.pyの処理を見て見ます。

from typing import Any, Dict

import pandas as pd


def split_data(data: pd.DataFrame, example_test_data_ratio: float) -> Dict[str, Any]:
    """Node for splitting the classical Iris data set into training and test
    sets, each split into features and labels.
    The split ratio parameter is taken from conf/project/parameters.yml.
    The data and the parameters will be loaded and provided to your function
    automatically when the pipeline is executed and it is time to run this node.
    """
    data.columns = [
        "sepal_length",
        "sepal_width",
        "petal_length",
        "petal_width",
        "target",
    ]
    classes = sorted(data["target"].unique())
    # One-hot encoding for the target variable
    data = pd.get_dummies(data, columns=["target"], prefix="", prefix_sep="")

    # Shuffle all the data
    data = data.sample(frac=1).reset_index(drop=True)

    # Split to training and testing data
    n = data.shape[0]
    n_test = int(n * example_test_data_ratio)
    training_data = data.iloc[n_test:, :].reset_index(drop=True)
    test_data = data.iloc[:n_test, :].reset_index(drop=True)

    # Split the data to features and labels
    train_data_x = training_data.loc[:, "sepal_length":"petal_width"]
    train_data_y = training_data[classes]
    test_data_x = test_data.loc[:, "sepal_length":"petal_width"]
    test_data_y = test_data[classes]

    # When returning many variables, it is a good practice to give them names:
    return dict(
        train_x=train_data_x,
        train_y=train_data_y,
        test_x=test_data_x,
        test_y=test_data_y,
    )

この後出てくるパイプラインとデータの入出力を合わせる必要がありますが、それ以外は普通の関数の処理であることがわかると思います。書く場所をある程度縛るだけで、ここら辺に特にルールがないのは助かりますね。

パイプラインを編集する

パイプラインの処理で編集するのは二箇所で、pipeline.pyhooks.pyの二つです。

pipeline.py

まずpipeline.pyですが、自分で実装したnodeの関数を、kedro.pipeline.nodeを使って、パイプラインに組み込みます。組み込むときには、第一引数に関数、第二引数に入力で渡すデータ、第三引数に出力のデータを渡します。

from kedro.pipeline import Pipeline, node

from .nodes import split_data


def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                split_data,
                ["example_iris_data", "params:example_test_data_ratio"],
                dict(
                    train_x="example_train_x",
                    train_y="example_train_y",
                    test_x="example_test_x",
                    test_y="example_test_y",
                ),
            )
        ]
    )

これだけです。

hooks.py

pipeline.pyで作ったパイプラインを実行できるようにします。また、パイプライン間に依存関係がある場合がほとんどだと思いますので、その処理も書きます。

from typing import Any, Dict, Iterable, Optional

from kedro.config import ConfigLoader
from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.versioning import Journal

from ab_recommender.pipelines import data_engineering as de
from ab_recommender.pipelines import data_science as ds


class ProjectHooks:
    @hook_impl
    def register_pipelines(self) -> Dict[str, Pipeline]:
        """Register the project's pipeline.

        Returns:
            A mapping from a pipeline name to a ``Pipeline`` object.

        """
        data_engineering_pipeline = de.create_pipeline()
        data_science_pipeline = ds.create_pipeline()

        return {
            "de": data_engineering_pipeline,
            "ds": data_science_pipeline,
            "__default__": data_engineering_pipeline + data_science_pipeline,
        }

    @hook_impl
    def register_config_loader(self, conf_paths: Iterable[str]) -> ConfigLoader:
        return ConfigLoader(conf_paths)

    @hook_impl
    def register_catalog(
        self,
        catalog: Optional[Dict[str, Dict[str, Any]]],
        credentials: Dict[str, Dict[str, Any]],
        load_versions: Dict[str, str],
        save_version: str,
        journal: Journal,
    ) -> DataCatalog:
        return DataCatalog.from_config(
            catalog, credentials, load_versions, save_version, journal
        )


project_hooks = ProjectHooks()

これだけで、実行順序の依存関係を記述することができます。

まとめ

以上がざっくりとしたkedroの使い方です。まだ試せていないのですが、kedroのフレームに則って書いておけば、AWS Batchやkubeflowなどにデプロイすることができるようになるなど、他にも良さげな感じの物が多いので、使ってみたいと思います。

それでは!

*1:経験日数一週間くらいですがw

*2:あんまりわからなかった

*3:かっ、、書かない言い訳ではない笑