Embulk Advent Calendar 2015の8日目の記事です。
こんにちは @sonots です。昨日に引き続き、淡々と拙作のプラグインの紹介をしようと思います。
本日紹介するのは embulk-output-vertica プラグインです。
embulk-output-vertica
URL: https://github.com/sonots/embulk-output-vertica
弊社以外にまだユーザがいなさそうなプラグインですが、Vertica 運用の知見を盛り込んだ出来になっているのがこちらのプラグインです。なんと Vertica にデータを投入することができます。See Also 何故DeNAがverticaを選んだか?
先日書いた embulk-filter-column や embulk-filter-row はパフォーマンスを考えて JRuby ではなく Java で書いているのですが(See Also embulk で pure java と jruby でプラグインを作ったときの速度比較)、embulk-output-vertica は書き込み以外には時間のかかるような処理をしていないですし、書き込みのオーバーヘッドが支配的になるので JRuby で書いています.
標準的な使い方
基本的には接続情報だけ書いてあげればOKです。Input から渡ってきたスキーマ情報を元に自動的にクエリを組み立ててくれます。
out:
type: vertica
host: 127.0.0.1
user: dbadmin
password: xxxxxxx
database: vdb
schema: sandbox
table: embulk_test
copy_mode: DIRECT
abort_on_error: true
テーブルがない場合に自動でテーブルを作る機能があるのですが、デフォルトの vertica の型では不満がある場合は(例えば、embulk の string 型のデータが渡ってきた場合、varchar(80) にしています)、column_options
で補足してあげることもできます。もしくは、手動であらかじめテーブルを作っておいて頂いても構いません。
out:
type: vertica
host: 127.0.0.1
user: dbadmin
password: xxxxxxx
database: vdb
schema: sandbox
table: embulk_test
copy_mode: DIRECT
abort_on_error: true
column_options:
id: {type: INT}
name: {type: VARCHAR(255)}
date: {type: DATE, value_type: timestamp, timezone: "+09:00"}
内部動作の概要
内部動作的には以下のような挙動をしています
- (ターゲットテーブルがない場合は、ターゲットテーブルを作成します)
- テンポラリテーブルを作成し、そこに COPY statement でデータを投入します。
- テンポラリテーブルは、ターゲットテーブルのスキーマ情報を取得して、全く同じスキーマで作成します
- COPY は
COPY スキーマ名.テーブル名 FROM STDIN PARSER fjsonparser
を利用します。結果 fjsonparser の機能をフルに利用することができます。fjsonparser
最強説に準拠しています。
- COPY は
- テンポラリテーブルにデータを投入するのは、ターゲットテーブルの LOCK を取らないようにするためと、冪等性の担保をしやすくするためです。テンポラリテーブルには並列で書き込みます。
- テンポラリテーブルは、ターゲットテーブルのスキーマ情報を取得して、全く同じスキーマで作成します
- テンポラリテーブルにデータの投入が全て終わったら、INSERT SELECT でターゲットテーブルにデータを移し替えます。この処理は速いのですぐ終わります。
- データを移し変えたら、テンポラリテーブルを削除します
copy_mode: DIRECT
Vertica はデフォルトでは、一度 WOS と呼ばれるメモリ領域にデータを格納し、定期的に ROS と呼ばれるディスク領域に書き出すという挙動をします。
(ref. Verticaのデータ格納方法(WOSとROS))
Bulk load のような大容量格納のケースを考えると、その Vertica による最適化を使わずに、直接 ROS 領域に書き込んでしまうほうが効率が良いです。
copy_mode: DIRECT
オプションを使うと ROS に直接書き込むことができます。
書き込み並列数
Vertica は同一テーブルへの並列書き込みをサポートしています。そこで書き込みを並列処理させたい、という要求がでてきます。
一方で、Vertica への同時接続数が多くなると Vertica がエラーを吐くので(弊社の規模の Vertica でも 20 接続で限界です)、同時接続数(== 書き込み並列数)を抑えたいという要求がでてきます。Embulk では input プラグインによっては、並列読み込みをサポートしていて、並列で output プラグインを呼び出すので、愚直に実装すると、input プラグインの並列数分だけ同時接続してしまうことになります。
さらに、copy_mode: DIRECT
の場合、COPY 文を発行する度に、ROS 領域にファイルができてしまい、1024 (弊社の場合)を超えるとエラーが発生してしまうので、COPY 文の数を抑えたいという要求が出てきます。
このような要求すべてに答えるために、embulk-ouput-vertica では、指定した数の分だけ書き込みスレッドを切って、embulk の input から流れてきたデータを内部の Queue にエンキューし、書き込みスレッドでデキューしながら COPY 文に追記していく、という方法を取っています。
また、この Queue はデキューが終わるまではエンキューがブロックされるようにしていて(Ruby の SizedQueue を使っている)、Queue の容量が大きくなってメモリが溢れてしまう、というようなことがないようにしています。その分、input => filter ラインの処理をブロックしてしまいますが、トータルとしてのバランスを考えるとこの方が良いと判断しました。
書き込み並列数は pool
オプションで制御することができます。
pool: 3
失敗した行数
reject_on_materialized_type_error: true
オプションを利用すると、fjsonparser でデータを投入する時に、テーブルの型とデータの型が一致していないような場合に、その行を reject してくれるようになります。
Vertica の COPY 文では、insert に成功した行数、reject された行数が結果として帰ってくるので、ログに出力するようにしています。
2015-12-05 11:54:06.073 +0900 [INFO] (transaction): embulk-output-vertica: task_reports: [{"num_input_rows":2,"num_output_rows":2,"num_rejected_rows":0}]
Embulk の仕組み上、そのような情報を他のファイルに吐き出しておく仕組みが(まだ)ないので、ログをさらえば失敗した行数が見れるようになっているということです。。。
おわりに
Vertica 運用の知見を盛り込んだなかなかの出来になっていると思いますので、是非 Vertica をお持ちの方は(え、ご利用ください。
Embulk の仕組みでできず、独自に実装している機能がそこそこあるので、Embulk 側で書き込み並列数を制御する Fluentd でいう BufferdOutput のような仕組みや(正確には、今回は Enqueue 時に Queue size を超えていたらブロックするようにしたのでちょっと違う)、プラグインが実行結果を吐き出す場所が用意されていると良いですね、と要望を書いておきます ^ ^