この記事はリクルートライフスタイル Advent Calendar 2018の4日目の記事です。
こんばんは
CETというチームの @mihirat です。
最近ではいくつかの新規サービス開発で、ちょこちょこフロントやサーバー書かせてもらったり、ちょっとしたSRE的な役回りをしていたりします。
また、現在「Jupyterだけで機械学習が実サービス展開できる基盤」(slideshare・blog)の開発などをやってます。ぜひご一読ください。
今回は、社内で活用しているBQ定期実行アプリのご紹介です。
背景
弊社では分析や可視化にTableauを利用することが多いのですが、激重なクエリが描画のたびに発行されて待ち時間が長い・BQの計算資源を使いまくるという問題がありました。
そこで、「プログラムを書かなくても、よしなにBQに定期実行してテーブルを自動更新してくれるアプリケーションがあればなぁ」という話になったので突貫で作ってみたのですが、だんだん利用が増え、現在では100クエリ以上がこのアプリケーション上に登録されています。
「いやいや、スケジュールクエリ機能ってBQにありますよ?」と思われた方もいると思いますが、次のようなメリットがあります。
- クエリのレビュー・共有
- 強い分析官やデータサイエンティストの方から直接レビューを受けることができます。溜まりにくいSQLの知見が共有でき、チームのSQL力が向上します。また、「この前処理ってどう書いたらいいのかな?」な場合のリファレンスになります。
- 野良クエリの撲滅
- BQのスケジュールクエリは大変便利ですが、野良定期実行クエリとは管理側からすると恐ろしい機能です。このアプリがあれば、簡単に定期実行ができかつ管理下におけるので、懸念が減ります。
作ったもの
処理の流れの概要としては、
- GAEを定期的にkick
- kickされたGAEは、GCSに配置されたSQLや設定ファイルを取得し、キューに積む
- キューのConsumerとなるGAEが別に存在し、積まれたキューに従ってクエリを実行し、もろもろ処理する
というシンプルなものになっています。
ポイント1. TaskQueue
TaskQueueとは、GAE付属のキューの機能です(doc)。適当なyamlを書くだけで、キューの最大待ち長さ、消化速度、リトライ回数などが指定できます。
queue:
- name: bqcultivator
rate: 1/m # 消化速度
bucket_size: 100 # キューをためておける数
retry_parameters:
task_retry_limit: 1
max_concurrent_requests: 5 # 同時実行数
これにより、クエリ発行の速度を絞れるほか、リトライの自動化などもできます。
キューに積む処理は非常に簡単で、
task := taskqueue.NewPOSTTask("/bqcultivator/maketable",
url.Values{
"sql_file_name": {sc.SqlFileName},
"project_id": {sc.ProjectID},
"target_project_id":{sc.TargetProjectID},
"dataset_id": {sc.DatasetID},
})
taskqueue.Add(ctx, task, "bqcultivator")
のように書くだけでキューに積まれます。キューの管理がいらないのでとても楽です。
ポイント2. CronJob
CronJobは最近CloudSchedulerとしてスピンアウト?しましたが、GAEのエンドポイントを定期的に実行してくれる機能です(doc)。こちらもyamlを書くだけで設定できます。
- description: hourly check
url: /sqlenqueuer/task/hourly # app endpoint
schedule: every 1 hours from 00:00 to 23:00
timezone: Asia/Tokyo
これだけで、該当するGAEのエンドポイントを定期的に実行してくれます。hourly, daily, weeklyなどを用意しています。
ポイント3. dispatch
GAEは1プロジェクトあたり複数のGAEアプリケーションをデプロイでき、それらはURLベースで振り分けができます(doc)。
dispatch:
- url: "*/sqlenqueuer/*"
service: sqlenqueuer
- url: "*/bqcultivator/*"
service: bqcultivator
これで、CronJobがkickするエンドポイントで、該当するGAEを結びつけます。
アプリケーション部分
sqlenqueuer
CronJobで叩かれるエンドポイントを持ち、叩かれるとGCSにあるファイルをキューに積みます。
設定ファイルには、クエリのオプション(テーブル名、実行時間、Truncateするかどうか)が記載されてます。
bqcultivator
キューを消化して、BQにクエリを発行します。キューは勝手にPOSTされるので、特にキューについての処理を書く必要はなく、POSTに対しての処理だけ書いてあればOKです。
実行結果はslackに連携され、失敗していた場合はリトライリンクもセットにし、メンション付きで以下のようにユーザーに通知します。
2週間くらいで作ったときの超初期バージョンのコードです、もし興味があればご参照ください。クオリティ。。。
このアプリの使い方
- 利用者は、SQLと設定ファイルのyamlをGithub(Enterprise)にpushし、レビューを受ける
- マージされると、GCSにSQLと設定ファイルがCI/CDでアップロードされる
- GAEによって、GCSに配置されたSQLを定期実行
- 設定ファイルに従ってデータマートが作成されたり、結果をS3にファイルをおいたりする(後述)
追加機能:BQへのクエリの結果をそのままAPI化
CETチームでは、汎用APIと呼ばれる基盤を構築・運用しています。
2カラムのCSVを指定のS3にアップロードすると、そのCSVの1カラム目がkey, 2カラム目がvalueとなって、HTTPリクエストでkeyで引くとvalueが返却されるAPIを構築してくれる基盤です。valueには文字列はもちろんのこと、配列やJSONも埋め込み可能です。
詳しくはこちら。より詳細についてはTechBlogに書く予定です。(放置しててすみません)
今回のアプリは当初データマート作成だけしていたのですが、数ヶ月前にこのAPI基盤に連携する機能をリリースしました。
例えばBigQueryにしか置いてないデータを集計した結果や、BQ MLを使った簡単な機械学習の結果をS3に配置することで、そのままAPIにすることができます。
イメージとしては
SELECT user_id, some_json FROM `awesome_data.awesome_mart.awesome_table`
と書くと、 https://endpoint/key/
を叩くと some_json
が返却されるAPIが自動で作成されるものです。
awesome_tableを定期的に見に行くので、APIのレスポンスも日次などで更新されます。主にデータサイエンティストの方から好評の機能で、ABテストの高速化に貢献しています。
終わりに
話は変わり、最近Argoというプロダクトを検証しています。
- ワークフローエンジン (argo)
- CI/CD (argo-ci, argo-cd)
- イベントトリガー処理基盤 (argo-events)
が全部お互いによしなに云々してくれる(まだ何もわかっていない)らしく、バッチ処理など全てをこれに寄せるととてもハッピーな気がしています。
argo化すると、Helmよりも複雑なシステム全体をパッケージ化して共有・展開できそうなので、今回紹介したような基盤もargo化していけると良さそう?と期待しています。検証記事もそのうち書けるよう頑張ります。
よいお年を!