LoginSignup
11
3

More than 3 years have passed since last update.

Apache Beamでバルクデータローダを作っている話

Last updated at Posted at 2019-12-19

個人的に作っているETLツールの紹介をします。

分散処理可能なバルクデータローダ

最近、CSV等のテキストベースのファイルをBigQueryへデータロードする際にEmbulkを使っているのですが、短納期のデータ分析案件で、長時間掛かるデータロードが途中で失敗すると詰んでしまう場合があります。
Embulkのスループットを上げる方法がないか調べたところ、MapReduce Executorというプラグインがあるもののv0.9.18からサポートされなくなっています。
また、分散処理可能なバルクデータローダとしてApache Sqoopというのもありますが、Hadoop基盤を使ってRDBからHDFSやGoogle Cloud Storage等にデータロードができるものらしく今回の用途と合いません。
ちなみに、急ぎの時はApache Beamを使ってデータロード処理のコードを書いて、Google Cloud Dataflow上で実行していました。
そこで、EmbulkのようなYAMLベースのコンフィグファイルでApache Beamのパイプラインを構築・実行できたら楽ではないかと考えました。更に、既存のEmbulkのコンフィグも流用できたら良いなと。

作っているもの

というわけで、Embulkのコンフィグをそのまま(もしくは少し修正するだけで)、Apache Beamのパイプラインへ変換して実行するCLIツールを作り始めました。

GitHub:koji-m/bulqにソースコードがあります。

このツールの特徴としては、Embulkと同じようにバルクデータロードができることと、Embulkの売りでもあるプラグイン機構も実装しています。

使い方

まずは稼働確認として、以下のCSVファイル(sample_01.csv)をgzip圧縮したものを標準出力に出力してみます。

sample_01.csv
id,account,time,purchase,comment
1,12345,2019-12-15 12:23:22,20191215,bulq
2,99999,2019-12-16 19:30:41,20191216,bulq python
3,03245,2019-12-17 19:50:02,20191217,"bulq ""csv"" parser plugin"
4,11110,2020-01-01 09:54:20,20200101,NULL

以下のようなEmbulkと同じ形式のコンフィグ(config_01.yml)を用意します。

config_01.yml
in:
  type: file
  path_prefix: /work/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    null_string: 'NULL'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}

このコンフィグでは、プレフィックスが/work/sample_のgzip圧縮されたファイルをCSVとして指定のスキーマでパースし、標準出力に出力します。

以下実行結果です。


# 用意したファイル確認
root@ee86123f2463:/work# ls
config_01.yml  sample_01.csv.gz

# コンフィグファイルを指定して実行
root@ee86123f2463:/work# bulq run config_01.yml 
1,12345,2019-12-15 12:23:22,2019-12-15 00:00:00,bulq
2,99999,2019-12-16 19:30:41,2019-12-16 00:00:00,bulq python
3,3245,2019-12-17 19:50:02,2019-12-17 00:00:00,bulq "csv" parser plugin
4,11110,2020-01-01 09:54:20,20200101,NULL

次に、Google Cloud Storage上のCSVファイルをBigQueryにロードしてみます。
但しその前に、BigQuery用のoutputプラグインを別途インストールする必要があります。以下のように、インストールされているプラグインの確認と新規プラグインのインストールができます。内部的にはPythonのpipを使用してアプリ特有の場所にプラグインがインストールされるため、システムのPythonパッケージ領域を汚しません。


# インストール済みのプラグインの確認
root@ee86123f2463:/work# bulq pip list
----- installed plugins -----
input_file : 0.0.1
output_stdout : 
decoder_auto_detect : 
decoder_gzip : 
parser_csv : 
executor_local : 
executor_dataflow : 

# プラグインのインストール
root@ee86123f2463:/work# bulq pip install git+https://github.com/koji-m/bulq_output_sample_bq.git
installing git+https://github.com/koji-m/bulq_output_sample_bq.git
  Running command git clone -q https://github.com/koji-m/bulq_output_sample_bq.git /tmp/pip-req-build-mh7_7jx0
Collecting git+https://github.com/koji-m/bulq_output_sample_bq.git
  Cloning https://github.com/koji-m/bulq_output_sample_bq.git to /tmp/pip-req-build-mh7_7jx0
Building wheels for collected packages: bulq-output-sample-bq
  Building wheel for bulq-output-sample-bq (setup.py): started
  Building wheel for bulq-output-sample-bq (setup.py): finished with status 'done'
  Created wheel for bulq-output-sample-bq: filename=bulq_output_sample_bq-0.0.1-cp37-none-any.whl size=2273 sha256=bd3eb7ffdf8272d8c8678fa30a49a9ef60d1e5791ae4a71b68e46fb7fb442d4d
  Stored in directory: /tmp/pip-ephem-wheel-cache-ii881b57/wheels/7c/9e/66/d9d85a8c4abbea4cc658e53cef56679a82800e260d2a69c53b
Successfully built bulq-output-sample-bq
Installing collected packages: bulq-output-sample-bq
Successfully installed bulq-output-sample-bq-0.0.1

# 正常にインストールされたか確認
root@ee86123f2463:/work# bulq pip list
----- installed plugins -----
input_file : 0.0.1
output_stdout : 
output_sample_bq : 0.0.1  #<- インストールされている
decoder_auto_detect : 
decoder_gzip : 
parser_csv : 
executor_local : 
executor_dataflow : 

準備が整いましたのでデータロードに進みます。
今回ロードするCSVファイル(sample_02.csv)はBigQuery Public Dataのbikeshare-tripsというデータから抽出したものを使います。

sample_02.csv
trip_id,subscriber_type,bikeid,start_time,start_station_id,start_station_name,end_station_id,end_station_name,duration_minutes
9900285908,Annual Membership (Austin B-cycle),400,2014-10-26 14:12:00 UTC,2823,Capital Metro HQ - East 5th at Broadway,2544,East 6th & Pedernales St.,10
9900289692,Walk Up,248,2015-10-02 21:12:01 UTC,1006,Zilker Park West,1008,Nueces @ 3rd,39
9900285987,24-Hour Kiosk (Austin B-cycle),446,2014-10-26 15:12:00 UTC,2712,Toomey Rd @ South Lamar,2712,Toomey Rd @ South Lamar,31
9900285989,24-Hour Kiosk (Austin B-cycle),203,2014-10-26 15:12:00 UTC,2712,Toomey Rd @ South Lamar,2712,Toomey Rd @ South Lamar,31
9900285991,24-Hour Kiosk (Austin B-cycle),101,2014-10-26 15:12:00 UTC,2712,Toomey Rd @ South Lamar,2712,Toomey Rd @ South Lamar,30
...

更に、先の例ではローカルのマシン上で処理を実行していましたが、今回はGoogle Cloud Dataflowを使って実行してみます。
コンフィグは以下になります。

config_02.yml
exec:
  type: dataflow
  project: <project-id>
  job_name: bulq-test
  staging_location: gs://<project-id>/dataflow/staging
  temp_location: gs://<project-id>/dataflow/temp
in:
  type: file
  path_prefix: gs://<bucket-name>/raw/bikeshare_trip
  parser:
    type: csv
    charset: UTF-8
    newline: LF
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns: &1
    - {name: trip_id, type: long}
    - {name: subscriber_type, type: string}
    - {name: bikeid, type: long}
    - {name: start_time, type: timestamp, format: '%Y-%m-%d %H:%M:%S %Z'}
    - {name: start_station_id, type: long}
    - {name: start_station_name, type: string}
    - {name: end_station_id, type: long}
    - {name: end_station_name, type: string}
    - {name: duration_minutes, type: long}
  decoders:
  - {type: gzip}
out:
  type: sample_bq
  project: <project-id>
  dataset: bulq_test
  table: bikeshare_trip
  mode: replace
  gcs_bucket: <tmp-bucket-name>
  column_options: *1

実行してみます。


root@ee86123f2463:/work# bulq run config_02.yml

# WARNINGが出ますがすぐにプロンプトが戻ってきます

root@ee86123f2463:/work# 

コマンド実行後、暫く待ってからGoogle Cloud ConsoleでCloud Dataflowの画面を表示すると以下の様に処理の実行状況が確認できます。

スクリーンショット 2019-12-18 22.16.55.png

処理が完了状態となったら、BigQueryにデータがロードされているか確認します。

スクリーンショット 2019-12-18 22.18.05.png

ちゃんとロードされています。

プラグインの作り方

先のBigQueryへ出力するプラグインのようなサードパーティプラグインを作る場合は、以下のようなPythonパッケージを構成する必要があります。

bulq_<プラグインカテゴリ>_<プラグイン名>
├── bulq_<プラグインカテゴリ>_<プラグイン名>
│   ├── __init__.py                           #(1)
│   └── bulq_<プラグインカテゴリ>_<プラグイン名>.py   #(2)
└── setup.py

この中で(1)__init__.pyではプラグインモジュール読み込ませるだけです。

from . import bulq_<プラグインカテゴリ>_<プラグイン名>

(2)bulq_<プラグインカテゴリ>_<プラグイン名>.pyがプラグインの処理を記述するファイルとなります。以下例ではoutputプラグインとしてSamplePluginというクラスを定義しています。このクラスには4つのメソッド(__init__, prepare, build, setup)を実装する必要があります。

from bulq.core.plugin import output_plugin

@output_plugin('sample') #outputプラグインの場合
class SamplePlugin:
    def __init__(self, conf):
        # 渡されたコンフィグを基に初期化する処理

    def prepare(self, pipeline_options):
        # Beamのpipeline optionを設定する処理

    def build(self, p):
        # Beamのパイプラインを構成する処理

    def setup(self):
        # Beamパイプラインの実行直前に実行したい処理

詳細は先程使ったoutput_sample_bqプラグインのコードを参照して下さい。

現状と制約

このツールはまだ実装検証段階です。
input/output/executorのリソースへの対応状況はApache BeamのPython SDKの対応状況に依存します。なので、現状inputとしてAWS S3が使えないなど制約があります。なお、executorとしてはLocalとDataflowだけ稼働確認済みですが、Sparkとかでも実行できるかもしれません。

ToDo

  • コンフィグファイルのliquid対応
  • pip uninstallサブコマンド
  • 各ビルトインプラグインの実装
  • Cloud Dataflowテンプレートへの対応
11
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
3