Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
2
Help us understand the problem. What is going on with this article?
@cocoa-maemae

ETL(ELT)処理用フレームワークcliboaで独自のtransform処理をパイプラインに組み込む方法

概要

ETL(ELT)処理用フレームワークcliboaとは何かではcliboaの概要とクイックスタートまでを説明した。
cliboaは標準でバンドルされているETL処理だけでなく、開発者が独自にETL処理を実装して、実装したETL処理をパイプラインに組み込んで動かすことができる。本記事ではcliboaで独自のtransform処理を実装し、パイプラインに組み込んで動かす方法について紹介する。

前提条件

クイックスタートで説明した手順を一通り完了していること。

ファイル・ディレクトリ構造と拡張実装用ファイルの追加場所

cliboaをインストールした後、cliboadminコマンドによってファイル・ディレクトリ構造が生成される。
この中に、common/scenarioとproject/simple-etl/scenarioディレクトリがあり、拡張実装用のファイルはこの2つのディレクトリのいづれかに配置する。それぞれのディレクトリの役割の考え方としては
・common/scenario: 複数projectから共通で利用する拡張ETL処理
・project/simple-etl/scenario: project固有の拡張ETL処理
となる。

独自のTransform処理を実装する

ETL(ELT)処理の中では、複雑なtransform処理が必要になることが少なくない。ここでは拡張実装の一例として、project固有のtransform処理を実装し、動かす方法を説明する。

transform処理を実装するファイルの配置

project固有のtransform処理が必要であるケースを考え、project/simple-etl/scenario以下にproject_transform.pyファイルを配置する。

|-- Pipfile
|-- bin
|   `-- clibomanager.py
|-- common
|   |-- __init__.py
|   |-- environment.py
|   `-- scenario
|-- conf
|   |-- cliboa.ini
|   `-- logging.conf
|-- logs
|-- project
|   `-- sample-etl
|       |-- scenario
|       |   `-- project_transform.py # ここに配置
|       `-- scenario.yml
`-- requirements.txt

拡張実装の流れ

拡張実装の流れとしては
1. BaseStepクラスをimportして継承したクラスを用意する
2. コンストラクタを実装し、super().__init__()を実装する。
3. scenario.yamlで設定したいargumentsを、インスタンス変数として定義する。加えて、インスタンス変数と同じ名前でsetterを用意する。(setterの定義方法がpythonの標準文法と異なるため注意)
4. executeメソッドを用意する。executeメソッド内で独自の処理を実装する。(メソッド名は必ずexecuteにする必要がある)
のようになる。

独自のtransform処理の実装

例として、project_transform.pyファイルの中で、ProjectTransformというクラスを実装する。

from cliboa.scenario.base import BaseStep


class ProjectTransform(BaseStep):
     """
     プロジェクト固有のtransform処理
     """

     def __init__(self):
         super().__init__()
         self._parameter1 = None
         self._parameter2 = None

   # parameter1のsetter
     def parameter1(self, parameter1):
         self._parameter1 = parameter1

     # parameter2のsetter
     def parameter2(self, parameter2):
         self._parameter2 = parameter2

     def execute(self, *args):
        print(self._parameter1, self._parameter2)
        """
        ここから下に独自のtransform処理を書いていく
        """

environment.pyの修正

common/environment.pyを修正し、定数PROJECT_CUSTOM_CLASSESに拡張実装したファイルの名前空間と、クラス名を文字列で追加する。

(略)

# common custom classes to make available
"""
common/scenario以下で拡張実装した場合は、こちらのリストに名前空間とクラス名を追加する
"""
COMMON_CUSTOM_CLASSES = []

# project congenital classes to make available
"""
今回はここに追加実装した名前空間(.pyを取り除いたファイル名)とクラス名を文字列で追加
"""
PROJECT_CUSTOM_CLASSES = ["project_transform.ProjectTransform"]

scenario.yamlに記載

scenario.yamlで拡張実装したクラスを指定する。

scenario:
# 何らかのextract処理
- step:
  class: ProjectTransform
  arguments:
    parameter1: hoge
    parameter2: hogehoge
# 何らかのload処理

実行

実行方法は実行と同じである。上述の拡張実装を動かした場合、下記のようになれば成功である。

cd sample
python3 bin/clibomanager.py sample-etl

hoge hogehoge

2
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
2
Help us understand the problem. What is going on with this article?