LoginSignup
16
1

More than 3 years have passed since last update.

サーバレスにお手軽なデータBotを作ってみよう〜♪ on GCP

Last updated at Posted at 2019-12-06

この記事は、Classi Advent Calendar 2019 の7日目の記事です。

こんにちは、ClassiのデータAI部で、日々、データと格闘している@yuu_kimyです。
今年も、引き続き、GCP環境でデータ基盤の構築やデータの民主化に関わっています。

今年の8月ぐらいから、正式に、社内のデータ基盤を稼働しています。
名前は、「ソクラテス」と名付けています。データから学びを探り、探究するという意味を込めています。

それに合わせて、社内のデータ活用をより高めていくために、「SQL勉強会」を始めました。こちらの勉強会の内容は、別途、リポートできたらと思っています。

一方で、データの活用を仕組みの面でも後押ししていきたいと思っています。例えば、ちょっとしたデータ抽出でも、定期的にしていくと、それなりの手間になりますが、簡単なBotを作って、自動化にトライしてみたいと思います。

今回の要件

  • BigQueryにあるモニタリング用のテーブルを集計して、必要なデータを抽出する
  • 抽出した結果をSlackの特定チャンネルに配信する
  • 定期的に配信する(週1、月1など)

データBotの構成 on GCP

上記の要件を満たすために、GCPの幾つかのサービスを利用して、こんな風な構成で作ってみたいと思います。
スクリーンショット 2019-12-06 14.44.43.png

利用する材料(サービス)

  • Cloud Scheduler
    • cronジョブスケジューラ。定期スケジューラとして利用。
  • Cloud Pub/Sub
    • 上記のCloud Schedulerが、ターゲットのPub/Subのトピックへメッセージをパブリッシュ。
  • Cloud Functions
    • Pub/Subをトリガーに、BigQueryへのクエリ実行とSlackへ通知。
  • BigQuery
    • 言わずもがなのDWH環境。
  • Slack
    • 今回は、Incoming Webhookを利用。

レシピ(実装の流れ)

  • 必要なクエリを書く(ここは適宜)
  • Cloud Pub/Subのトピックを作成
  • Cloud Schedulerの設定(ターゲットはPub/Subを選択し、上記で作成したトピックを指定)
  • Cloud Functionsで関数の処理を書く(自分はPythonで書きましたが、お好みで、Node.jsやGoで)
    • BigQueryへのデータを取得する
    • データの取得結果の整形
    • Slackへの通知処理

ポイント

もっと、シンプルに作成するのであれば、Cloud Schedulerのターゲットを「http」にして、Cloud Functionsのエンドポイントを指定する構成がありますが、それだと、特定のクエリしか呼び出せず、他のデータBotを作りたい場合は、全く同じような構成を作る羽目になってしまいます。
今回のように、Pub/SubとSchedulerを組み合わせることで、スケジュールごとにトピックのパブリッシュができるので、基本は、それに対応するクエリを用意すれば、簡易的なBotができるというわけです。

  • Sceduler側の設定(ペイロードの定義)
sample.json
{"botId": "X", "sqlCode": "yyy"}

以下は、Cloud Schedulerの画面のサンプルです。
スクリーンショット 2019-12-06 15.29.32.png

Cloud Functionsの中身は、こんな風に書いてみました。(一部抜粋)

main.py
from google.cloud import bigquery
import json
import requests
import base64
import sys
from datetime import datetime,timedelta
from pytz import timezone

def send_to_slack(data, context):
    # Pub/Subのメッセージの取得
    pubsub = base64.b64decode(data['data']).decode('utf-8')
    message = json.loads(pubsub)

    if message["botId"] is not None:
        sql_code = message["sqlCode"]
        result = get_bq_data(sql_code)
    else:
        sys.exit(1)

    # Slack通知の実装(ここでは省略)
    post_slack(result)

def get_bq_data(sql_code):
    # 実際は、slql_codeとクエリを対応づけた辞書を作成しておくのが楽そう
    if sql_code  == 'yyy':
        query = "select from yyy ..."
    elif sql_code == 'zzz':
        query = "select from zzz ..."
    else:
        query = "select from other ..."

    # BigQueryへのクエリ実行
    bigquery_client = bigquery.Client()
    rows = bigquery_client.query(query).result()

    # クエリ結果の整形(ここは見せたい内容次第で、結構変わってくるかと思います)
    result = []
    for row in rows:
        result.append("{},\t{},\t{:,}".format(row[0], row[1], row[2]))

    return result

def post_slack(result):

    url = "web_hook_url"

    dt_now = datetime.now(timezone('Asia/Tokyo')) + timedelta(days=-1)
    target_message = "{0:18-04〜%Y-%m-%d時点までの結果}".format(dt_now)

    # Slackへの通知の見せ方次第で変わってくるかと思います
    requests.post(url, data=json.dumps({
        'text': 'サンプルBot; to: @sample-directors',
        'username': 'サンプルBot',
        'icon_emoji': ':robot_face:',
        'link_names': 1,
        "attachments": [
          {
            "fallback": "this is a bot by gcf",
            "text": "{}\n{}".format(target_message ,"\n".join(result)),
            "color": "#3399FF"
          }
        ]
    }))

上記の設定と関数のデプロイをすることで、下記のような簡易的なデータBotが出来上がりました!
スクリーンショット 2019-12-06 16.28.06.png
※実際に、社内のチーム向けに運用をし始めました。

これで、定期的なデータ抽出のタスクが自動化できました!めでたしめでたし🎉

おわりに

簡単な構成で、サーバレスに実装できるのは素晴らしいですね。
他にも、組み合わせ方次第で、通知の仕組みなども作れるなーと思っています。

明日は、@hailongさんです。どうぞお楽しみに〜♪

16
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
1