12
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

pythonとEmbulkで複数データソース横断のにわかETLを作る

Posted at

最近、TreasureDataと自社のDBのデータを合わせた集計をバッチ処理することが多くpythonでいったんcsvに書き出してEmbulkでアップロードしていましたが、データソースが増えると面倒になっていくのでより簡単で汎用化した書き方を考えました。
結局はWF使ってやったらいい話なので需要があるか不明ですが、WF勉強するのも時間がかかるので慣れてる言語でてっとり早くETLを作りたい人向けの内容です。

前提条件

この記事が適している人

  • 普段はpythonを書いているのでなるべくその範囲でETLしたい
  • pandas-tdの動きが不安定すぎて使えない
  • シェルでの実現方法を調べる時間もない

達成したい機能

  • SQLさえ作ればバッチを追加できるETLの箱を作る
  • 入出力のテーブルが変わっても手間がかからない
  • 記述した順番に処理が実行され、進捗が通知される

環境の準備

Embulkのインストール

すでに存在するTipsで詳しく説明されています。
Embulkをインストール

Embulk用プラグインのインストール

input用とoutput用でプラグインが分かれています。インストールは以下のようなコマンド一発で簡単です。

$ embulk gem install embulk-input-td

今回はPostgreSQLからinputしてtdにoutputしたいのでembulk-output-postgresqlを入れました。
こちらからプラグインを探すことができます。
Embulk(エンバルク)プラグインのまとめ

にわかETLの実装

取り込み設定yaml(liquid)の準備

変数で後から値変更できるようにした取り込み設定ファイルを作成します。プラグインのページにExampleがあるので基本そこをコピペして自分の環境に合わせます。

load_sample.yml.liquid
in:
  type: postgresql
  host: 0.0.0.0
  user: user_name
  password: pass
  database: db_name
  query: |
    {{ env.sql }}
filters:
- type: add_time
  to_column: {name: time, type: timestamp}
  from_value: {mode: upload_time}
out: {type: td, apikey: xxxxxxxxx, endpoint: api.treasuredata.com,
  database: {{ env.o_db }}, table: {{ env.o_tbl }}, time_column: time, mode: append, default_timestamp_format: '%Y-%m-%d
    %H:%M:%S'}

動的変数の設定

{{ env.xxxx }}が動的な値になり環境変数から参照されます。後のpythonで環境変数の設定とEmbulkの呼び出しをすることになります。
ここではホストやID・PWは固定していますが、動的にしても良いですし、固定のパラメタでもログイン情報系は.bashrcか.bash_profileに変数登録しておけばpythonから渡さなくても自動で読み込まれますのでyamlを複数作る際のメンテナンス性およびセキュリティ向上につながります。

スキーマ設定について

私の場合はoutputがTreasureDataだったのでスキーマ設定なしですが、普通のDBに出すときはout:パートにschemaを設定しないと、勝手にpublicに設定されテーブルが見つからないエラーになるので注意。

ETL用pythonの準備

汎用部分の作成

etl_batch.py
import os

def td2pg(o_db, o_tbl, sql):
	os.system('export o_db=' + o_db + ';export o_tbl=' + o_tbl + '; export sql="'+ sql.replace("\n", " ") + '"; embulk run /path_to_liquid_file/load_sample.yml.liquid')

yamlで設定する予定の動的変数はすべて追加してください。ここまで来ればあとはSQLを作って設定した関数を呼ぶだけです。

ETL処理の記述

SQLと出力先DBなどの動的変数を指定して関数を呼ぶ処理を書いていくだけです。
どこかでエラーになった場合のために進捗をprintしておきます。

etl_batch.py
# task1
sql = """
select * from tbl
"""
o_db = "output_db_name"
o_tbl = "output_tbl_name"
td2pg(o_db, o_tbl, sql)

print("task1 is done.")

cronの登録(バッチ稼働させたい人用)

最後にcronで作ったpythonを定期実行する設定をします。
クーロン(cron)をさわってみるお

以下のようにメール送信設定をしておくと、実行時にEmbulkのログとprintした内容が送られてくるのでエラー時にどこまで進捗したか追うことができます。

MAILTO="xxxx@example.com"
30 15 * * * /path_to_python_file/etl_batch.py

この例だと毎日15時30分にETLバッチが実行されることになります。

yamlで.bashrcの環境変数を呼び出している場合

.bashrcに登録した変数を呼び出す場合は、python実行前に変数を有効化する記述が必要です。cronの登録ユーザーが自分であっても.bashrcだけだとエラーになり、フルパスで指定する必要があるので注意。

MAILTO="xxxx@example.com"
30 15 * * * source /user_home/.bashrc; /path_to_python_file/etl_batch.py

#最後に
私はSQL内で動的に日付指定したかったので、pythonでdatetimeもimportして日付生成しています。シェルでもできると思いますが使い慣れた言語のほうが早いですし、途中にpythonのデータ加工を挟むこともできます。
あとは大きなデータでもEmbulkが勝手に分散処理してくれるのでメモリオーバーの心配もありません。
…とはいえpython使い以外には誰得な内容なので誰かのお役に立てれば幸いです。

以上

12
11
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?