はじめに
ETLツールの一種であるEmbulkでは、データ転送の処理をyamlで設定して実行します。
動的に処理内容を変更したい場合、yamlの一部を環境変数に置き換えることができます。
しかし、処理内容を実行ごとに変化させたい場合、環境変数で設定をするのは面倒です。例えば、出力するファイル名に実行時間を加えたい場合などには、環境変数を逐一変更することになります。
この記事では、Python用のテンプレートエンジンであるjinja2を使って、簡単にyamlの設定ファイルを動的生成する方法について紹介します。より作り込んだシステムが必要な場合には、Digdagなどのワークフローエンジンを使うことが良いです。
そもそもEmbulkとは
EmbulkとはOSSのバッチ型のデータ転送ツールです。ストリーミングデータを対象にしたfluentdと対になるようなツールです。コンパクトながら、たくさんの入出力のデータソースに対応しておりとても便利です。
日本人の古橋さんという方が主導で開発されているようです。とてもありがたいです。下のリンクはリリース時の記事です。
簡単な使い方を紹介します。データ転送(ETL)の入力と出力の処理内容を、下のようにyamlで定義してコマンド実行します。例では、gzipで圧縮されたcsvを標準出力する処理を示しています。設定内容の補完機能があり、下のような設定を全て手入力で行う必要は無く、また、プレビュー機能やフィルター機能なども利用できます。
in: # 入力データの内容(ETLのExtract部分)
type: file
path_prefix: ./try1/csv/sample_
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: LF
type: csv
delimiter: ','
quote: '"'
escape: '"'
null_string: 'NULL'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out: {type: stdout} # 出力データの内容(ETLのLoad部分)
jinja2でyamlを動的に生成
前置きが長くなりましたが、jinja2でyamlを生成するコード例について紹介します。
以下では、テンプレートとなるyamlファイルをもとに、実行するyamlファイルを生成し、Embulkで実行しています。
具体的には、mysqlからデータを抽出し、gzip化されたcsvファイルを出力する処理を行っています。テンプレートエンジンを使うことで、ファイル名などの動的な設定を(簡単に!)できます。
# テンプレートのyamlファイル
in:
type: mysql
host: localhost
user: <ユーザ名>
password: <パスワード>
database: sample
table: user
select: "id, name"
out:
type: file
path_prefix: ./data/sample_{{ start_time }} # 変数を入れたい箇所
sequence_format: .
file_ext: csv.gz
formatter:
type: csv
encoders:
- type: gzip
# Embulkを実行するためのPythonスクリプト
import datetime
import os
from jinja2 import Template # jinja2パッケージを使う!!
# 現在時刻の取得
dt_now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
# yamlテンプレートの取得
with open('template.yml') as f:
template_yml = f.read()
teamplate = Template(source=template_yml)
# 変数の代入
new_yml = teamplate.render(start_time=dt_now)
# 実行するyamlファイルの作成
with open('config.yml', 'w') as f:
f.write(new_yml)
# Embulkの実行
os.system('embulk run config.yml')
# Embulkで実行するyamlファイル(実行ごとに生成)
in:
type: mysql
host: localhost
user: <ユーザ名>
password: <パスワード>
database: sample
table: user
select: "id, name"
out:
type: file
path_prefix: ./data/sample_20221002_162745 # 動的に変更した箇所
sequence_format: .
file_ext: csv.gz
formatter:
type: csv
encoders:
- type: gzip
おわりに
今回はPythonのテンプレートエンジンであるjinja2を使って、yamlを動的に生成する方法について紹介しました。flaskでjinja2を使っていたのを思い出して使ってみましたが、簡単で結構便利です。Embulkに限らず、設定ファイルの管理などにも使えそうです。
まだ、Digdagなどのワークフローエンジンは使ったことがないので、そちらの方も試してみたいです。