個人的に作っているETLツールの紹介をします。
分散処理可能なバルクデータローダ
最近、CSV等のテキストベースのファイルをBigQueryへデータロードする際にEmbulkを使っているのですが、短納期のデータ分析案件で、長時間掛かるデータロードが途中で失敗すると詰んでしまう場合があります。
Embulkのスループットを上げる方法がないか調べたところ、MapReduce Executorというプラグインがあるもののv0.9.18からサポートされなくなっています。
また、分散処理可能なバルクデータローダとしてApache Sqoopというのもありますが、Hadoop基盤を使ってRDBからHDFSやGoogle Cloud Storage等にデータロードができるものらしく今回の用途と合いません。
ちなみに、急ぎの時はApache Beamを使ってデータロード処理のコードを書いて、Google Cloud Dataflow上で実行していました。
そこで、EmbulkのようなYAMLベースのコンフィグファイルでApache Beamのパイプラインを構築・実行できたら楽ではないかと考えました。更に、既存のEmbulkのコンフィグも流用できたら良いなと。
作っているもの
というわけで、Embulkのコンフィグをそのまま(もしくは少し修正するだけで)、Apache Beamのパイプラインへ変換して実行するCLIツールを作り始めました。
GitHub:koji-m/bulqにソースコードがあります。
このツールの特徴としては、Embulkと同じようにバルクデータロードができることと、Embulkの売りでもあるプラグイン機構も実装しています。
使い方
まずは稼働確認として、以下のCSVファイル(sample_01.csv)をgzip圧縮したものを標準出力に出力してみます。
id,account,time,purchase,comment
1,12345,2019-12-15 12:23:22,20191215,bulq
2,99999,2019-12-16 19:30:41,20191216,bulq python
3,03245,2019-12-17 19:50:02,20191217,"bulq ""csv"" parser plugin"
4,11110,2020-01-01 09:54:20,20200101,NULL
以下のようなEmbulkと同じ形式のコンフィグ(config_01.yml)を用意します。
in:
type: file
path_prefix: /work/sample_
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: LF
type: csv
delimiter: ','
quote: '"'
escape: '"'
null_string: 'NULL'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out: {type: stdout}
このコンフィグでは、プレフィックスが/work/sample_のgzip圧縮されたファイルをCSVとして指定のスキーマでパースし、標準出力に出力します。
以下実行結果です。
# 用意したファイル確認
root@ee86123f2463:/work# ls
config_01.yml sample_01.csv.gz
# コンフィグファイルを指定して実行
root@ee86123f2463:/work# bulq run config_01.yml
1,12345,2019-12-15 12:23:22,2019-12-15 00:00:00,bulq
2,99999,2019-12-16 19:30:41,2019-12-16 00:00:00,bulq python
3,3245,2019-12-17 19:50:02,2019-12-17 00:00:00,bulq "csv" parser plugin
4,11110,2020-01-01 09:54:20,20200101,NULL
次に、Google Cloud Storage上のCSVファイルをBigQueryにロードしてみます。
但しその前に、BigQuery用のoutputプラグインを別途インストールする必要があります。以下のように、インストールされているプラグインの確認と新規プラグインのインストールができます。内部的にはPythonのpipを使用してアプリ特有の場所にプラグインがインストールされるため、システムのPythonパッケージ領域を汚しません。
# インストール済みのプラグインの確認
root@ee86123f2463:/work# bulq pip list
----- installed plugins -----
input_file : 0.0.1
output_stdout :
decoder_auto_detect :
decoder_gzip :
parser_csv :
executor_local :
executor_dataflow :
# プラグインのインストール
root@ee86123f2463:/work# bulq pip install git+https://github.com/koji-m/bulq_output_sample_bq.git
installing git+https://github.com/koji-m/bulq_output_sample_bq.git
Running command git clone -q https://github.com/koji-m/bulq_output_sample_bq.git /tmp/pip-req-build-mh7_7jx0
Collecting git+https://github.com/koji-m/bulq_output_sample_bq.git
Cloning https://github.com/koji-m/bulq_output_sample_bq.git to /tmp/pip-req-build-mh7_7jx0
Building wheels for collected packages: bulq-output-sample-bq
Building wheel for bulq-output-sample-bq (setup.py): started
Building wheel for bulq-output-sample-bq (setup.py): finished with status 'done'
Created wheel for bulq-output-sample-bq: filename=bulq_output_sample_bq-0.0.1-cp37-none-any.whl size=2273 sha256=bd3eb7ffdf8272d8c8678fa30a49a9ef60d1e5791ae4a71b68e46fb7fb442d4d
Stored in directory: /tmp/pip-ephem-wheel-cache-ii881b57/wheels/7c/9e/66/d9d85a8c4abbea4cc658e53cef56679a82800e260d2a69c53b
Successfully built bulq-output-sample-bq
Installing collected packages: bulq-output-sample-bq
Successfully installed bulq-output-sample-bq-0.0.1
# 正常にインストールされたか確認
root@ee86123f2463:/work# bulq pip list
----- installed plugins -----
input_file : 0.0.1
output_stdout :
output_sample_bq : 0.0.1 #<- インストールされている
decoder_auto_detect :
decoder_gzip :
parser_csv :
executor_local :
executor_dataflow :
準備が整いましたのでデータロードに進みます。
今回ロードするCSVファイル(sample_02.csv)はBigQuery Public Dataのbikeshare-tripsというデータから抽出したものを使います。
trip_id,subscriber_type,bikeid,start_time,start_station_id,start_station_name,end_station_id,end_station_name,duration_minutes
9900285908,Annual Membership (Austin B-cycle),400,2014-10-26 14:12:00 UTC,2823,Capital Metro HQ - East 5th at Broadway,2544,East 6th & Pedernales St.,10
9900289692,Walk Up,248,2015-10-02 21:12:01 UTC,1006,Zilker Park West,1008,Nueces @ 3rd,39
9900285987,24-Hour Kiosk (Austin B-cycle),446,2014-10-26 15:12:00 UTC,2712,Toomey Rd @ South Lamar,2712,Toomey Rd @ South Lamar,31
9900285989,24-Hour Kiosk (Austin B-cycle),203,2014-10-26 15:12:00 UTC,2712,Toomey Rd @ South Lamar,2712,Toomey Rd @ South Lamar,31
9900285991,24-Hour Kiosk (Austin B-cycle),101,2014-10-26 15:12:00 UTC,2712,Toomey Rd @ South Lamar,2712,Toomey Rd @ South Lamar,30
...
更に、先の例ではローカルのマシン上で処理を実行していましたが、今回はGoogle Cloud Dataflowを使って実行してみます。
コンフィグは以下になります。
exec:
type: dataflow
project: <project-id>
job_name: bulq-test
staging_location: gs://<project-id>/dataflow/staging
temp_location: gs://<project-id>/dataflow/temp
in:
type: file
path_prefix: gs://<bucket-name>/raw/bikeshare_trip
parser:
type: csv
charset: UTF-8
newline: LF
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns: &1
- {name: trip_id, type: long}
- {name: subscriber_type, type: string}
- {name: bikeid, type: long}
- {name: start_time, type: timestamp, format: '%Y-%m-%d %H:%M:%S %Z'}
- {name: start_station_id, type: long}
- {name: start_station_name, type: string}
- {name: end_station_id, type: long}
- {name: end_station_name, type: string}
- {name: duration_minutes, type: long}
decoders:
- {type: gzip}
out:
type: sample_bq
project: <project-id>
dataset: bulq_test
table: bikeshare_trip
mode: replace
gcs_bucket: <tmp-bucket-name>
column_options: *1
実行してみます。
root@ee86123f2463:/work# bulq run config_02.yml
# WARNINGが出ますがすぐにプロンプトが戻ってきます
root@ee86123f2463:/work#
コマンド実行後、暫く待ってからGoogle Cloud ConsoleでCloud Dataflowの画面を表示すると以下の様に処理の実行状況が確認できます。
処理が完了状態となったら、BigQueryにデータがロードされているか確認します。
ちゃんとロードされています。
プラグインの作り方
先のBigQueryへ出力するプラグインのようなサードパーティプラグインを作る場合は、以下のようなPythonパッケージを構成する必要があります。
bulq_<プラグインカテゴリ>_<プラグイン名>
├── bulq_<プラグインカテゴリ>_<プラグイン名>
│ ├── __init__.py #(1)
│ └── bulq_<プラグインカテゴリ>_<プラグイン名>.py #(2)
└── setup.py
この中で(1)__init__.pyではプラグインモジュール読み込ませるだけです。
from . import bulq_<プラグインカテゴリ>_<プラグイン名>
(2)bulq_<プラグインカテゴリ>_<プラグイン名>.pyがプラグインの処理を記述するファイルとなります。以下例ではoutputプラグインとしてSamplePluginというクラスを定義しています。このクラスには4つのメソッド(__init__, prepare, build, setup)を実装する必要があります。
from bulq.core.plugin import output_plugin
@output_plugin('sample') #outputプラグインの場合
class SamplePlugin:
def __init__(self, conf):
# 渡されたコンフィグを基に初期化する処理
def prepare(self, pipeline_options):
# Beamのpipeline optionを設定する処理
def build(self, p):
# Beamのパイプラインを構成する処理
def setup(self):
# Beamパイプラインの実行直前に実行したい処理
詳細は先程使ったoutput_sample_bqプラグインのコードを参照して下さい。
現状と制約
このツールはまだ実装検証段階です。
input/output/executorのリソースへの対応状況はApache BeamのPython SDKの対応状況に依存します。なので、現状inputとしてAWS S3が使えないなど制約があります。なお、executorとしてはLocalとDataflowだけ稼働確認済みですが、Sparkとかでも実行できるかもしれません。
ToDo
- コンフィグファイルのliquid対応
- pip uninstallサブコマンド
- 各ビルトインプラグインの実装
- Cloud Dataflowテンプレートへの対応