この記事は、Classi Advent Calendar 2019 の7日目の記事です。
こんにちは、ClassiのデータAI部で、日々、データと格闘している@yuu_kimyです。
今年も、引き続き、GCP環境でデータ基盤の構築やデータの民主化に関わっています。
今年の8月ぐらいから、正式に、社内のデータ基盤を稼働しています。
名前は、「ソクラテス」と名付けています。データから学びを探り、探究するという意味を込めています。
それに合わせて、社内のデータ活用をより高めていくために、「SQL勉強会」を始めました。こちらの勉強会の内容は、別途、リポートできたらと思っています。
一方で、データの活用を仕組みの面でも後押ししていきたいと思っています。例えば、ちょっとしたデータ抽出でも、定期的にしていくと、それなりの手間になりますが、簡単なBotを作って、自動化にトライしてみたいと思います。
今回の要件
- BigQueryにあるモニタリング用のテーブルを集計して、必要なデータを抽出する
- 抽出した結果をSlackの特定チャンネルに配信する
- 定期的に配信する(週1、月1など)
データBotの構成 on GCP
上記の要件を満たすために、GCPの幾つかのサービスを利用して、こんな風な構成で作ってみたいと思います。
利用する材料(サービス)
- Cloud Scheduler
- cronジョブスケジューラ。定期スケジューラとして利用。
- Cloud Pub/Sub
- 上記のCloud Schedulerが、ターゲットのPub/Subのトピックへメッセージをパブリッシュ。
- Cloud Functions
- Pub/Subをトリガーに、BigQueryへのクエリ実行とSlackへ通知。
- BigQuery
- 言わずもがなのDWH環境。
- Slack
- 今回は、Incoming Webhookを利用。
レシピ(実装の流れ)
- 必要なクエリを書く(ここは適宜)
- Cloud Pub/Subのトピックを作成
- Cloud Schedulerの設定(ターゲットはPub/Subを選択し、上記で作成したトピックを指定)
- Cloud Functionsで関数の処理を書く(自分はPythonで書きましたが、お好みで、Node.jsやGoで)
- BigQueryへのデータを取得する
- データの取得結果の整形
- Slackへの通知処理
ポイント
もっと、シンプルに作成するのであれば、Cloud Schedulerのターゲットを「http」にして、Cloud Functionsのエンドポイントを指定する構成がありますが、それだと、特定のクエリしか呼び出せず、他のデータBotを作りたい場合は、全く同じような構成を作る羽目になってしまいます。
今回のように、Pub/SubとSchedulerを組み合わせることで、スケジュールごとにトピックのパブリッシュができるので、基本は、それに対応するクエリを用意すれば、簡易的なBotができるというわけです。
- Sceduler側の設定(ペイロードの定義)
{"botId": "X", "sqlCode": "yyy"}
以下は、Cloud Schedulerの画面のサンプルです。
Cloud Functionsの中身は、こんな風に書いてみました。(一部抜粋)
from google.cloud import bigquery
import json
import requests
import base64
import sys
from datetime import datetime,timedelta
from pytz import timezone
def send_to_slack(data, context):
# Pub/Subのメッセージの取得
pubsub = base64.b64decode(data['data']).decode('utf-8')
message = json.loads(pubsub)
if message["botId"] is not None:
sql_code = message["sqlCode"]
result = get_bq_data(sql_code)
else:
sys.exit(1)
# Slack通知の実装(ここでは省略)
post_slack(result)
def get_bq_data(sql_code):
# 実際は、slql_codeとクエリを対応づけた辞書を作成しておくのが楽そう
if sql_code == 'yyy':
query = "select from yyy ..."
elif sql_code == 'zzz':
query = "select from zzz ..."
else:
query = "select from other ..."
# BigQueryへのクエリ実行
bigquery_client = bigquery.Client()
rows = bigquery_client.query(query).result()
# クエリ結果の整形(ここは見せたい内容次第で、結構変わってくるかと思います)
result = []
for row in rows:
result.append("{},\t{},\t{:,}".format(row[0], row[1], row[2]))
return result
def post_slack(result):
url = "web_hook_url"
dt_now = datetime.now(timezone('Asia/Tokyo')) + timedelta(days=-1)
target_message = "{0:18-04〜%Y-%m-%d時点までの結果}".format(dt_now)
# Slackへの通知の見せ方次第で変わってくるかと思います
requests.post(url, data=json.dumps({
'text': 'サンプルBot; to: @sample-directors',
'username': 'サンプルBot',
'icon_emoji': ':robot_face:',
'link_names': 1,
"attachments": [
{
"fallback": "this is a bot by gcf",
"text": "{}\n{}".format(target_message ,"\n".join(result)),
"color": "#3399FF"
}
]
}))
上記の設定と関数のデプロイをすることで、下記のような簡易的なデータBotが出来上がりました!
※実際に、社内のチーム向けに運用をし始めました。
これで、定期的なデータ抽出のタスクが自動化できました!めでたしめでたし🎉
おわりに
簡単な構成で、サーバレスに実装できるのは素晴らしいですね。
他にも、組み合わせ方次第で、通知の仕組みなども作れるなーと思っています。
明日は、@hailongさんです。どうぞお楽しみに〜♪