Embulk Advent Calendar 2015の7日目の記事です。
こんにちは @sonots です。淡々と拙作のプラグインの紹介をしていこうと思います。文量が多くなったのと明日、明後日が空いていたので、記事を分けて
- 7日目: embulk-filter-column および embulk-filter-row
- 8日目: embulk-output-vertica
- 9日目: embulk-parser-none および embulk-filter-stdout
のように紹介していこうと思います。
embulk-filter-column
URL: https://github.com/sonots/embulk-filter-column
カラムの絞り込みをするための filter プラグインです。
弊社での Embulk のユースケースは、主に HDFS に格納されているログを Vertica に移し替えるという作業なのですが(See Also 何故DeNAがverticaを選んだか?)、
HDFS やローカルファイル、S3 などからファイルを取り出して処理するような場合は、SQL でカラムを絞り込むことができないので、Embulk でカラムを絞り込みたいという要求がありました。そこで、作成したのがこのプラグインです。
標準的な使い方
使い方としては、例えば以下のような入力があったとした場合に
time,id,key,score
2015-07-13,0,Vqjht6YEUBsMPXmoW1iOGFROZF27pBzz0TUkOKeDXEY,1370
2015-07-13,1,VmjbjAA0tOoSEPv_vKAGMtD_0aXZji0abGe7_VXHmUQ,3962
2015-07-13,2,C40P5H1WcBx-aWFDJCI8th6QPEI2DOUgupt_gB8UutE,7323
次のように設定を書くと、
filters:
- type: column
columns:
- {name: time}
- {name: id}
- {name: key}
指定した time, id, key カラムだけ残して (score カラムを落として) くれます。
time,id,key
2015-07-13,0,Vqjht6YEUBsMPXmoW1iOGFROZF27pBzz0TUkOKeDXEY
2015-07-13,1,VmjbjAA0tOoSEPv_vKAGMtD_0aXZji0abGe7_VXHmUQ
2015-07-13,2,C40P5H1WcBx-aWFDJCI8th6QPEI2DOUgupt_gB8UutE
default
値が null だった場合に、default 値を指定して埋め込みたい、という要求があったので、default 機能というものも用意してあります。
例えば以下のような入力があったとした場合に
time,id,key,score
2015-07-13,0,,1370
2015-07-13,1,,3962
2015-07-13,2,C40P5H1WcBx-aWFDJCI8th6QPEI2DOUgupt_gB8UutE,7323
次のように設定を書くと、
filters:
- type: column
columns:
- {name: time}
- {name: id}
- {name: key, default: foobar}
key カラムの値が null だった行に default: 値を設定してくれます。
time,id,key
2015-07-13,0,foobar
2015-07-13,1,foobar
2015-07-13,2,C40P5H1WcBx-aWFDJCI8th6QPEI2DOUgupt_gB8UutE
src
プルリクエストしてもらった機能なのですが、src でカラム名を指定して、コピーすることができます。
使い方としては、例えば以下のような入力があったとした場合に
time,id,key,score
2015-07-13,0,Vqjht6YEUBsMPXmoW1iOGFROZF27pBzz0TUkOKeDXEY,1370
2015-07-13,1,VmjbjAA0tOoSEPv_vKAGMtD_0aXZji0abGe7_VXHmUQ,3962
2015-07-13,2,C40P5H1WcBx-aWFDJCI8th6QPEI2DOUgupt_gB8UutE,7323
次のように設定を書くと、
filters:
- type: column
columns:
- {name: time}
- {name: id}
- {name: key}
- {name: new_key, src: key}
key カラムをコピーした new_key カラムを作ることができます。
time,id,key,new_key
2015-07-13,0,Vqjht6YEUBsMPXmoW1iOGFROZF27pBzz0TUkOKeDXEY,Vqjht6YEUBsMPXmoW1iOGFROZF27pBzz0TUkOKeDXEY
2015-07-13,1,VmjbjAA0tOoSEPv_vKAGMtD_0aXZji0abGe7_VXHmUQ,VmjbjAA0tOoSEPv_vKAGMtD_0aXZji0abGe7_VXHmUQ
2015-07-13,2,C40P5H1WcBx-aWFDJCI8th6QPEI2DOUgupt_gB8UutE,C40P5H1WcBx-aWFDJCI8th6QPEI2DOUgupt_gB8UutE
こちらでも、default: を併用して、同時に null を埋めることができます。
rename に使用することもできますね (rename に関していうと、標準で rename filter プラグインがありますが)
add_columns
最初は columns だけだったのですが、全カラムを残したまま、カラムの追加だけをしたいケースが生まれたので足した機能がこちらです。
使い方としては、例えば以下のような入力があったとした場合に
id,key
0,Vqjht6YEUBsMPXmoW1iOGFROZF27pBzz0TUkOKeDXEY
1,VmjbjAA0tOoSEPv_vKAGMtD_0aXZji0abGe7_VXHmUQ
2,C40P5H1WcBx-aWFDJCI8th6QPEI2DOUgupt_gB8UutE
次のように設定を書くと、
filters:
- type: column
add_columns:
- {name: d, type: timestamp, default: "2015-07-13", format: "%Y-%m-%d"}
d カラムを追加してくれます。
id,key,d
0,Vqjht6YEUBsMPXmoW1iOGFROZF27pBzz0TUkOKeDXEY,2015-07-13
1,VmjbjAA0tOoSEPv_vKAGMtD_0aXZji0abGe7_VXHmUQ,2015-07-13
2,C40P5H1WcBx-aWFDJCI8th6QPEI2DOUgupt_gB8UutE,2015-07-13
ただし、必ず一番後ろに追加されてしまうので、順番を制御したい場合は columns
のほうを使用してください。
drop_columns
add_columns と対象に、カラムを落とすだけの機能があったほうが便利かな、と思ったので足した機能がこちらです。
使い方としては、例えば以下のような入力があったとした場合に
time,id,key
2015-07-13,0,Vqjht6YEUBsMPXmoW1iOGFROZF27pBzz0TUkOKeDXEY
2015-07-13,1,VmjbjAA0tOoSEPv_vKAGMtD_0aXZji0abGe7_VXHmUQ
2015-07-13,2,C40P5H1WcBx-aWFDJCI8th6QPEI2DOUgupt_gB8UutE
次のように設定を書くと、
filters:
- type: column
drop_columns:
- {name: key}
key カラムを削除してくれます。
time,id
2015-07-13,0
2015-07-13,1
2015-07-13,2
embulk-filter-row
URL: https://github.com/sonots/embulk-filter-row
行の絞り込みをするための filter プラグインです。
弊社での Embulk のユースケースは(ry、SQL の WHERE 句で行を絞り込むことができないので、Embulk で行を絞り込みたいという要求がありました。そこで、作成したのがこのプラグインです。
標準的な使い方
使い方としては、例えば以下のような入力があったとした場合に
time,foo,bar,flag,id,name,score
2015-07-13,,bar,true,0,Vqjht6YEUBsMPXmoW1iOGFROZF27pBzz0TUkOKeDXEY,1370
2015-07-13,,bar,true,1,VmjbjAA0tOoSEPv_vKAGMtD_0aXZji0abGe7_VXHmUQ,3962
2015-07-13,,bar,true,97,RRNYDAzKaq4Trtt96Bxgk3N0fXLIV8hXoK0qQ7uw_Wc,5065.0
次のように設定を書くと、
filters:
- type: row
condition: AND
conditions:
- {column: foo, operator: "IS NOT NULL"}
- {column: id, operator: ">=", argument: 10}
- {column: id, operator: "<", argument: 20}
- {column: name, opeartor: "include", argument: foo, not: true}
- {column: time, operator: "==", argument: "2015-07-13", format: "%Y-%m-%d"}
全ての条件に合致した行のみ(AND条件)に絞り込んでくれます。
time,foo,bar,flag,id,name,score
2015-07-13,,bar,true,97,RRNYDAzKaq4Trtt96Bxgk3N0fXLIV8hXoK0qQ7uw_Wc,5065.0
OR条件
つい先日、OR条件にも対応しました。
次のようにして、foo カラムもしくは id カラムが null じゃない行を抜き出してくれます。
filters:
- type: row
condition: OR
conditions:
- {column: foo, operator: "IS NOT NULL"}
- {column: id, operator: "IS NOT NULL"}
ORのAND条件
embulk-filter-row としては、直接的には ((A OR B) AND (C OR D))
のような条件式には対応していませんが、filters に複数並べることである程度は表現できると思います。
例えば以下のようにして、(a または b が null じゃない) かつ (c または d が null じゃない) 行を抜き出すことができます。
filters:
- type: row
condition: OR
conditions:
- {column: a, operator: "IS NOT NULL"}
- {column: b, operator: "IS NOT NULL"}
- type: row
condition: OR
conditions:
- {column: c, operator: "IS NOT NULL"}
- {column: d, operator: "IS NOT NULL"}
おわりに
より複雑な条件に対しては、HDFS であれば、Hive や Presto、Impala のようなクエリエンジンを使って、SQL で条件式を組み立てて入力時に絞り込んでしまったほうが良いと思っていて、機能をそこそこに留めている背景があります。
ローカルファイルやS3に関しても、Presto や Apache Drill などは対応しているので (Impala も Unsupported Preview ではあるが機能があるのかな)、それらを使って SQL で条件式を書けるようになったほうが柔軟な対応ができるのではないかと思います。
誰か、動的に Presto を Embulk 上で起動して、クエリを使ってファイルを読み込めるプラグインを作ってくれたりしないかな(チラッ