LoginSignup
1
2

More than 1 year has passed since last update.

PythonでEmbulkを動的に実行する

Last updated at Posted at 2022-10-02

はじめに

ETLツールの一種であるEmbulkでは、データ転送の処理をyamlで設定して実行します。
動的に処理内容を変更したい場合、yamlの一部を環境変数に置き換えることができます。

しかし、処理内容を実行ごとに変化させたい場合、環境変数で設定をするのは面倒です。例えば、出力するファイル名に実行時間を加えたい場合などには、環境変数を逐一変更することになります。
この記事では、Python用のテンプレートエンジンであるjinja2を使って、簡単にyamlの設定ファイルを動的生成する方法について紹介します。より作り込んだシステムが必要な場合には、Digdagなどのワークフローエンジンを使うことが良いです。

そもそもEmbulkとは

EmbulkとはOSSのバッチ型のデータ転送ツールです。ストリーミングデータを対象にしたfluentdと対になるようなツールです。コンパクトながら、たくさんの入出力のデータソースに対応しておりとても便利です。

日本人の古橋さんという方が主導で開発されているようです。とてもありがたいです。下のリンクはリリース時の記事です。

簡単な使い方を紹介します。データ転送(ETL)の入力と出力の処理内容を、下のようにyamlで定義してコマンド実行します。例では、gzipで圧縮されたcsvを標準出力する処理を示しています。設定内容の補完機能があり、下のような設定を全て手入力で行う必要は無く、また、プレビュー機能やフィルター機能なども利用できます。

処理内容.yml
in: # 入力データの内容(ETLのExtract部分)
  type: file
  path_prefix: ./try1/csv/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    null_string: 'NULL'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout} # 出力データの内容(ETLのLoad部分)

jinja2でyamlを動的に生成

前置きが長くなりましたが、jinja2でyamlを生成するコード例について紹介します。
以下では、テンプレートとなるyamlファイルをもとに、実行するyamlファイルを生成し、Embulkで実行しています。
具体的には、mysqlからデータを抽出し、gzip化されたcsvファイルを出力する処理を行っています。テンプレートエンジンを使うことで、ファイル名などの動的な設定を(簡単に!)できます。

template.yml
# テンプレートのyamlファイル
in:
  type: mysql
  host: localhost
  user: <ユーザ名>
  password: <パスワード>
  database: sample
  table: user
  select: "id, name"
out:
  type: file
  path_prefix: ./data/sample_{{ start_time }} # 変数を入れたい箇所
  sequence_format: .
  file_ext: csv.gz
  formatter:
    type: csv
  encoders:
  - type: gzip
execute_embulk.py
# Embulkを実行するためのPythonスクリプト
import datetime
import os

from jinja2 import Template # jinja2パッケージを使う!!

# 現在時刻の取得
dt_now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')

# yamlテンプレートの取得
with open('template.yml') as f:
    template_yml = f.read()
teamplate = Template(source=template_yml)

# 変数の代入
new_yml = teamplate.render(start_time=dt_now)

# 実行するyamlファイルの作成
with open('config.yml', 'w') as f:
    f.write(new_yml)

# Embulkの実行
os.system('embulk run config.yml')
config.yml
# Embulkで実行するyamlファイル(実行ごとに生成)
in:
  type: mysql
  host: localhost
  user: <ユーザ名>
  password: <パスワード>
  database: sample
  table: user
  select: "id, name"
out:
  type: file
  path_prefix: ./data/sample_20221002_162745 # 動的に変更した箇所
  sequence_format: .
  file_ext: csv.gz
  formatter:
    type: csv
  encoders:
  - type: gzip

おわりに

今回はPythonのテンプレートエンジンであるjinja2を使って、yamlを動的に生成する方法について紹介しました。flaskでjinja2を使っていたのを思い出して使ってみましたが、簡単で結構便利です。Embulkに限らず、設定ファイルの管理などにも使えそうです。
まだ、Digdagなどのワークフローエンジンは使ったことがないので、そちらの方も試してみたいです。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2