LoginSignup
10
5

More than 3 years have passed since last update.

gokartを使ってデータパイプライン上のpandas関連をテストする。

Last updated at Posted at 2020-02-03

書くこと

  • gokartを使ってpandas関連の確認をする方法
  • 1つ目はinputのpd.Dataframeがemptyだったときに正常終了をするかの確認
  • 2つ目はdumpするときに各columnが想定通りの型になっているかの確認

gokartとは?

  • エムスリーやfringe81などが開発しているOSS
  • Spotifyが開発しているluigiをラップして使いやすくしている。特にコードを書く量が減る。

対象バージョン

inputのpd.Dataframeがemptyだったときに正常終了をするかの確認

  • 下記のコードはpd.Dataframeがemptyのときにエラーが発生する。
  • 単体テストを書くことは前提だが、拾いきれないことが多々あった。
class DataTask(gokart.TaskOnKart):
    task_namespace = 'sample'

    def run(self):
        df = pd.DataFrame(dict(user=[1, 2], item=['a', 'b']))
        self.dump(df)


class TaskA(gokart.TaskOnKart):
    task_namespace = 'sample'

    def requires(self):
        return DataTask()

    def run(self):
        df = self.load_data_frame()
        # dfが空のときは`KeyError`が発生する。
        df['user'] = df['user'].apply(lambda x: f'user_{x}')
        self.dump(df)


if __name__ == '__main__':
    gokart.run()
  • 下記を実行するとemptyでも動くかどうかが確認できる。
  • --test-run-pandas--test-run-namespace=sampleとつける。
$ python main.py sample.TaskA --local-scheduler --test-run-pandas --test-run-namespace=sample
  • メッセージ
gokart test results:
status=OK; namespace=sample; name=DataTask; id=10f87ddcf3df71d786a023ae5e0bbc98;
status=NG; namespace=sample; name=TaskA; id=44e9690a4d2182a9bed6b6d9730291bd; message=<class 'KeyError'>: user

dumpするときに各columnが想定通りの型になっているかの確認

  • pandasの型が意図しない操作により変わっていないかを確認する。
  • taskごとに確認してもいいが、namespaceごとにcolumn_nameとtypeの組み合わせを定義する。
  • gokart.PandasTypeConfigを’継承し、namespace内でのルールを定義する。
class SamplePandasTypeCheck(gokart.PandasTypeConfig):
    task_namespace = 'sample'

    @classmethod
    def type_dict(cls) -> Dict[str, Any]:
        return {'user': int}


class DataTask(gokart.TaskOnKart):
    task_namespace = 'sample'

    def run(self):
        df = pd.DataFrame(dict(user=[1, 2], item=['a', 'b']))
        self.dump(df)


class TaskA(gokart.TaskOnKart):
    task_namespace = 'sample'

    def requires(self):
        return DataTask()

    def run(self):
        df = self.load_data_frame()
        # userの型がintからstrに変更されている。
        df['user'] = df['user'].apply(lambda x: f'user_{x}')
        self.dump(df)


if __name__ == '__main__':
    gokart.run()
  • 普通にtaskを実行すれば自動的に型がチェックされる。
  • この場合だとuserstrに変更されているので、TaskAが失敗する。
$ python main.py sample.TaskA --local-scheduler
  • メッセージ
gokart.pandas_type_config.PandasTypeError: expected type is "<class 'int'>", but "<class 'str'>" is passed in column "user".

...

===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 ran successfully:
    - 1 sample.DataTask(...)
* 1 failed:
    - 1 sample.TaskA(...)

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

10
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
5