Embulk
digdag

bigquery, embulk, digdagでETL処理をやって

EMBULKとDIGDAGの話がしたくて仕方がないんですけど、うちの会社でDIGDAGとかEMBULK知ってる人が全くいなくてここに残します。
サーバー → Bigquery → Mysql → SQL Server の流れでデーターを毎日ロードするシステムを作ってます。
SQLServerの後もバッチプロシージャで集計したり色々大変な作業が待ってるんですけど、それはDBAがやってくれるとして… その前のETLだけを自分がやってます。
Bigqueryは普通こんなETLの流れの最後の部分に位置するんですけど、うちの会社のいろんな事情で…BigqueryからRDBMSへのフローを作らなくちゃならなくなったんです。
Bigqueryからのダウンロードはembulkでやってますけど、思ったより大変でした。
最初はembulk-input-bigqueryとembulk-output-mssqlとか使ったら簡単!…と思ってやってみたんですけど、現実はそう甘くなく。ちょっと時間がかかりました。
inputの話.
embulk-input-bigquery プラグインはテーブルデーターが多かったら(大体6G以上)弱40000件くらいしか持ってこれなくて、それ修正して使うとしたら重くなってメモリーも大食い。このプラグインはデーターロードにBigquery Streaming API使ってるからこれは仕方ないと思いました。それでそれをforkして何とかファイル経由のロード機能を作れないかって思ってルビーコードを読んでみましたけど、ちょっと難しい。何か参考になるものがないかと思ってEMBULKのドキュメントみたらそもそもembulkのrubyプラグインはembulk-input-fileを継承できないみたいです。参考するソースコードがない。それでembulk new java-file-input を実行して、新しくembulk-input-bigquery_extract_files というプラグインを作ってとりあえず回しました。
でも今思えばこんなプラグイン作るよりDIGDAGでやったほうが早かったんじゃないかと…ちょっと後悔。
outputの話。
embulk-output-sqlserver のプラグインはinsert_method: normalではすごく遅い!
insert_method: nativeは早かったんですけど、うちのマシーンではプロセスの最後にsqlncli11.dllのbcp_doneでクラッシュが起こりました。
Windows7でも出来ないし、サーバーのCENTOS6でやってみても何故かエラーで動かない。Windowsサーバーはまだ試してないけど、いますぐ会社からサーバーをもらえるかどうかわからない。
それでembulk-output-sqlserver のソースをダウンロードして、色々といじってbcp_doneの部分を消したら何とか動きましたけど、そごく不安。
仕方なく今はembulk-output-mysqlを使ってMysql経由でSQLServerに流してます。
BCP_DONEで発生したエラーを再現して報告したいんですけど、今は時間がないので後で…
とりあえす動くから‥ちょっと引っかかるけど…まぁembulkはここまで!
(12/8) 追伸。
この問題を報告したら直してくれました!
https://github.com/embulk/embulk-output-jdbc/issues/210
embulk-output-sqlserverは今は問題なく使ってます。
ありがとうございます!!

でもまだた。まだ終わってない!(スネーク!)
あとの問題はBigquery → Mysql この区間で、すべて日付テーブルでまるまる持ってきたら苦労しないんですけど、クエリでやらなくちゃいけなくなって…
毎回クエリをダイナミックに作ってEMBULKのINPUTに繋げないといけない。
パラメーターをMysqlとBigqueryから持ってきてクエリを作らなくちゃならないから、liquidではちょっと足りない。
しかもテーブルも一つじゃなくて沢山。
それでPythonとかShell Scriptを使ってEmbulkの設定ファイル作って繋げたらどうか!って考えてみたけど、なんか雑な気がする!複雑になる!
そこでワークフローを使ったら、ETLはもちろん、そのあとの集計もちょっと楽になるんじゃないかって考えてました。
DIGDAG、AIRFLOW、LUIGIとか検討してみました。
DIGDAGのYAML設定は一番気に入ったんですけど、UIがAIRFLOWに比べたらちょっと地味。検索してみたらGCPをタスクコマンドもAIRFLOWに比べて少ないみたいだし。LUIGIは…なんかDIGDAGとAIRFLOWの中間のあたりの感じ。AIRFLOWもDIGDAGも失敗したらやってみるとしました。
それで最初はAIRFLOWでやってみたんですけど、これやってみるとなんだか微妙。DAGを作るPYTHONコードもなんだか雑な感じで、動的にDAGを作るとしたら何かStartDate関連でエラー発生するし。しかもUIで管理したDAGスケヂュールはなんか予想通りにならないし、個別にCLIでDAGを実行するのもやや混乱。DIGDAGはUI以外は全て気に入ったのに!それでDIGDAGに転換。GCPタスクはPYTHONでなんとかしてみると思った。
で、やってみるとすごく楽!
動的にクエリ作って、EMBULKに繋げて、MySQLに無事インサート完了!簡単!
この単純なものがAIRFLOWではなんて大変だったのか…
まぁ、ここまではなんとか出来たし、あとはプログラムを拡張するだけ。
でもいままで簡単だと思ってそのまま出来た歴史がない!(涙
まぁ…それでも成功したら後で全て記録に残すとします。
では。