3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

LookerAdvent Calendar 2020

Day 25

LookerアクションをGCPで実装してインハウスDMPを作った話

Last updated at Posted at 2020-12-24

※ この記事はLooker Advent Calender 2020の25日目のものです :santa:

こんにちは。まさおです。
フリュー株式会社で1年ほど前から データニンジャ データアーキテクトをしています。

今回はALL GCPでアクションを実装し、LookerをGUIとしたインハウスDMPツールを構築した話をします。
やたら長いです。すいません。


作ったもの

abst.png
通常、Exploreを使う際はディメンションとメジャーを選んでグラフや表を出力…とするかと思いますが、ここでは各ディメンションでフィルターをかけて、実装の手順1で作成する「ユーザーID一覧メジャー」だけを選んで使います。
それをアクションでポチッと送りつければ、BQ内にあるデータを参照してデータを変換・加工、最終的にGCSに欲しいデータが吐き出されるという算段です。
今回は紹介しないのですが、社内で実用しているものはさらにそこからデータパイプラインをつなげ、ユーザーセグメントを作成する以外の処理を自動化しています。

今回はこの「うまいことする奴」と書いているところの作り方を解説していきます。

うまいことする奴の中身はこんな感じ。
int.png
Looker上では、セグメントのユーザーIDの一覧を出力してアクションに送りつけます。
それを受け取ったアクション側でBQにクエリを投げ、データを変換してGCSに吐き出します。
Looker側で変換先それぞれをフィールドで用意してやれば、カスタムアクションとしてわざわざ実装する必要はないのですが、

  • 変換先自体は集計したいものではない
  • 変換したい種類が増えるたびにフィールドが増えることになるのでExploreの見通しが悪くなる
  • Exploreを使う人間からしたら抽出するときはお決まりのフィールドを選んでアクションの実行時に変換先を選ぶほうが使いやすい
  • 変換する部分の実装はそもそもLookerの関心外だよね
  • BQ上に情報があるならBQの強力な処理能力を活用したら良くない?

ということで、このようなアクションの実装をすることにしました。
もちろんあれこれデータを変換する必要がなければ、こんな実装をする必要は無いです。


実装手順

  1. LookMLを書いてExploreをつくる(シュッっと終わるよ)
  2. Listの実装(シュッっと終わるよ)
  3. Formの実装(シュッっと終わるよ)
  4. Executeの実装(ながいよ)
  5. つくったアクションをAction Hubに追加する(シュッっと終わるよ)
  6. 動かしてみて喜ぶ

LookerのカスタムアクションはList / Form(任意) / Executeの3つのAPIから構成されていて、手順の2〜4で作成しています。
このあたりの話はLookerアドベントカレンダーの20日目の記事で丁寧に解説しておられる方がいらっしゃったので、ぜひそちらもあわせて御覧ください。
ほかにもこのあたりも参考になります。


LookMLを書いてExploreをつくる

LookMLの実装の基本の話はしませんが、データ抽出をするに当たり、大量のデータを適度な長さのArrayで出力させる話だけ紹介します。
というのも、アクションに渡すユーザーID一覧は100万件とかとにかく大量に出力したく、Lookerで扱える行数上限をかいくぐらせるためです。
そして接続先のデータソースがBig Queryの場合は1行の長さ上限があるので、それも程よくかいくぐらせるためです。

ということでLookMLはこんな感じです。

user.view
  # 抽出するユーザーID これをリストにして吐き出したいのでhidden
  dimension: user_id {
    label: "ユーザーID"
    type: number
    primary_key: yes
    value_format_name: id
    sql: ${TABLE}.user_id ;;
    hidden: yes
  }

  # 適当に10行に分割して出力させるためにラベル付をしているディメンション。Exploreを使う人間としてはどうでもいいのでhidden。
  dimension: random_label {
    label: "ユーザーIDのランダム分類用ラベル"
    type: number
    sql: mod(ABS(${user_id}), 10) ;;
    hidden: yes
  }

  # 大本命のuser_idをリスト出だしてくれる奴
  measure: user_id_list {
    label: "ユーザーID一覧"
    type: string
    sql: ARRAY_TO_STRING(ARRAY_AGG(CAST(${user_id} as STRING)), ",") ;;
    required_fields: [random_label]
  }

random_labelなんだコレ

これは user_id を適当に10行に分類するためのラベルです。
ここでは user_id が整数なので、10で割った余剰をラベルとして使っています。
セグメントによっては均等な分割にはなりませんが、目的はLookerとBQの上限をかいくぐるためなので問題なしとしています。

また分割する数を変えたい場合は mod(ABS(${user_id}), 5) とか、mod関数の2つめの引数を変えればOKです。
ここをフィルターオンリーディメンションにすれば、Explore上で任意の数に変更させるなんてことも出来ます。

大本命user_id_list

ディメンションじゃなくてメジャーにすること、そして required_fields: [random_label] を入れることが大事です。
これによって random_label でつけたラベルごとにgroup byしてから配列にできるので、1行を程よい長さにしつつ、行数も程よい長さで出力できます。


Listの実装(シュッと終わるよ)

LookerのAction Hubでこのカスタムアクションを表示するためのパートです。
リクエストを投げたらjsonを返すだけのAPIを用意してやればいいので、Google Cloud Functionで作ります。

Cloud Functionで関数を作成を押したら、関数名を入れて、未認証の呼び出しを許可します。
他の設定はデフォルトのままでOKです。

Cloud Functionはいくつかランタイムが選べますがここではPythonです。

list.py
def action_list(request):
    r = request.get_json()
    
    response = """
        {
           "integrations": [{
                "name": "cloudstorage",
                "label": "IDをデコードして保存するやつ",
                "supported_action_types": ["query"],
                "url": {executeのエンドポイントURL},
                "form_url": {formのエンドポイントURL},
                "icon_data_uri": "",
                "required_fields": [],
                "params": [],
                "supported_formats": ["json"],
                "supported_formattings": ["unformatted"],
                "supported_visualization_formattings": ["noapply"]
           }]
       }

    """
    return response

form_url はアクション実行時になにか入力させたりしない場合は不要です。
このへんを参考に各フィールドを埋めます。

Cloud FunctionでエンドポイントをPythonとすると requirements.txt がデフォルトで作られますが、特に何も使わないので要りません。


Formの実装(シュッと終わるよ)

Listと同じ手順で、実装が違うだけです。

form.py
import json

def action_form(request):
	convert_options = """
		[{
			"name":"hoge_id",
			"label":"hogeプロジェクトのID"
		},
		{
			"name":"label",
			"label":"属性ラベル"
		}]
	"""

	form = """
		[{
			"name":"convert_type",
			"label":"変換タイプ",
			"description":"IDから変換したい情報を選んでください。",
			"type":"select",
			"default":"no",
			"required":true,
			"options":""" + json.dumps(json.loads(convert_options)) + """
		}]
	"""
	return form

この例ではLookerで抽出したユーザーIDを変換する先の選択肢をjsonで返しています。

この関数をデプロイしたら、「トリガー」タブにリクエストを投げるためのエンドポイントURLが出るので、一つ前のListの実装にある form_url に記載し、ListのCloud Functionを再度デプロイします。
ListのCloud Functionを再度デプロイするのは次のExecuteの実装後にも行うので、そちらとまとめて行ってもOKです。

こちらも requirements.txt は特に修正は要りません。


Execute の実装(ながいよ)

今回はCloud Runを使って実装していきます。
レスポンス時間とリクエストサイズがCloud Functionの上限に収まる場合は、ここもCloud Functionで実装してやればDockerfileも要りませんし非常に楽ちんです。

Dockerfileを書く

Dockerfile
FROM python:3.8-slim
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./
# ここでGCPのライブラリをいれています。 Flaskやgunicornは次の手順で実装するものをウェブアプリとして動かすためのものです。
RUN pip install --upgrade Flask gunicorn google-cloud-core google-cloud-storage google-cloud-bigquery
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 app:app

データの変換、出力処理を書く

特別な理由がない限り、以下で使用するGCSのバケット、BQ、Cloud Runのデプロイ先は同じリージョンになるようにしましょう。
別リージョンにすると料金が高くついたり、この通りの実装では動かないなどの悲しみが待っています。

Lookerから来たjsonを読んで一旦GCSに保存させる

app.py
def parse_request(request):
    request_json = request.get_json()
    # Formで選んだ変換タイプを取り出す
    convert_type = request_json['form_params']['convert_type']
    # LookerのExploreで出力したデータを取り出す
    query_data = json.loads(request_json['attachment']['data'])
    return convert_type, query_data

# Lookerのビュー名やフィールド名を排除し、2次元配列になっているidを1次元の配列にする
def remove_namespace(data):
    ids = list(x["{LookerのフィールドID}"] for x in data) 
    return list(itertools.chain.from_iterable([x.split(",") for x in ids]))

# 来たリストを1要素1行になるようにしてストレージに書き出す
def output_ids(object_name, id_list):
    # 1要素1行の、csvとして読めるようにする
    str = '\n'.join(map(str, id_list))

    gcs_client = storage.Client(PROJECT_NAME)
    bucket = gcs_client.get_bucket(BUCKET_NAME)
    blob = storage.Blob(object_name, bucket)

    blob.upload_from_string(str)

GCSにあるファイルをBQに一時テーブルとして書き出す

app.py
def create_tmp(tmp_file, table_name, id_list):
    # まずストレージに書き出す
    output_ids(tmp_file, id_list)

    # ストレージからテーブルを作成する
    bq_client = bigquery.Client(PROJECT_NAME)
    dataset_ref = bq_client.dataset(BQ_DATASET_NAME)
    schema = [
        bigquery.SchemaField('id', 'INTEGER'),
    ]
    table = bigquery.Table(dataset_ref.table(table_name), schema=schema)
    external_config = bigquery.ExternalConfig('CSV')
    external_config.source_uris = [
        'gs://' + BUCKET_NAME + '/' + tmp_file
    ]
    table.external_data_configuration = external_config
    bq_client.create_table(table)
    bq_client.close()

一時保存したテーブルを読んでデータを変換、それらをテーブルとして保存GCSに吐き出す

app.py
def convert_and_output(full_tmp_table_name, full_table_name, table_name, convert_type):
    # データ変換してテーブルとして保存させる
    # select文でtmpテーブルとjoinしてデータを変換するクエリを書く
    query = 'create table ' + full_table_name + """
        as (
            select """ + convert_type + """ from fugafuga
            inner join """ + full_tmp_table_name + """ as tmp
            on ...
        )
    """
    client = bigquery.Client()
    client.query(query).result()

    # テーブルに保存したものをcsvファイルとしてGCSに出力する
    # GCS内のファイルパスを組み立てる ここはディレクトリ構造によって変わります
    destination_uri = "gs://{}/{}/{}".format(BUCKET_NAME, convert_type, table_name + '.csv')
    dataset_ref = bigquery.DatasetReference(PROJECT_NAME, BQ_DATASET_NAME)
    table_ref = dataset_ref.table(table_name)
    config = bigquery.ExtractJobConfig()
    config.print_header = False

    extract_job = client.extract_table(
        source=table_ref,
        destination_uris=destination_uri,
        location='US',
        job_config=config,
    )
    extract_job.result()

    client.close()

不要なファイルやテーブルを削除

app.py
def delete_tmp(object_name, tmp_table_name, table_name):
    bq_client = bigquery.Client()
    query = 'drop table ' + table_name
    bq_client.query(query).result()

    query = 'drop table ' + tmp_table_name
    bq_client.query(query).result()
    bq_client.close()

    gcs_client = storage.Client(PROJECT_NAME)
    bucket = gcs_client.get_bucket(BUCKET_NAME)
    blob = storage.Blob(object_name, bucket)
    blob.delete()

全体像

ここまでできるとこんな感じの実装が出来上がります。

app.py
import google.cloud.bigquery as bigquery
import google.cloud.storage as storage
import os
import json
import itertools
from flask import Flask, request
import string

PROJECT_NAME = <GCPのプロジェクト名>
BUCKET_NAME = <GCSのバケット名>
BQ_DATASET_NAME = <BQのデータセット名>

app = Flask(__name__)
# 100MBまでリクエストを受け付ける via https://flask.palletsprojects.com/en/master/config/#MAX_CONTENT_LENGTH
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024

@app.route('/', methods=['POST'])
def main():
    convert_type, query_data = parse_request(request)

    data_to_stream = remove_namespace(query_data)

    table_name = <何かしら一意になるようなテーブル名>

    output(data_to_stream, convert_type, table_name)

if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

def parse_request(request):
    request_json = request.get_json()
    # Formで選んだ変換タイプを取り出す
    convert_type = request_json['form_params']['convert_type']
    # LookerのExploreで出力したデータを取り出す
    query_data = json.loads(request_json['attachment']['data'])
    return convert_type, query_data

# Lookerのビュー名やフィールド名を排除し、2次元配列になっているidを1次元の配列にする
def remove_namespace(data):
    ids = list(x["{LookerのフィールドID}"] for x in data) 
    return list(itertools.chain.from_iterable([x.split(",") for x in ids]))

# 来たリストを1要素1行になるようにしてストレージに書き出す
def output_ids(object_name, id_list):
    # 1要素1行の、csvとして読めるようにする
    str = '\n'.join(map(str, id_list))

    gcs_client = storage.Client(PROJECT_NAME)
    bucket = gcs_client.get_bucket(BUCKET_NAME)
    blob = storage.Blob(object_name, bucket)

    blob.upload_from_string(str)

# idのリストから変換先のidを引っ張ってきてファイル出力までする
def output(id_list, convert_type, table_name):    
    # 一連の流れで使うテーブル名、ファイル名を作る
    tmp_file = 'tmp/' + table_name + '.csv'
    full_table_name = '`' + PROJECT_NAME + '.' + BQ_DATASET_NAME + '.' + table_name + '`'
    tmp_table_name = 'tmp_' + table_name
    full_tmp_table_name = '`' + PROJECT_NAME + '.' + BQ_DATASET_NAME + '.' + tmp_table_name + '`'

    # 一時保存するテーブルを作成する
    create_tmp(tmp_file, tmp_table_name, id_list)
    # データを変換してGCSに出力
    convert_and_output(full_tmp_table_name, full_table_name, table_name, convert_type)

    # 一時ファイルとテーブルを削除
    delete_tmp(tmp_file, full_tmp_table_name, full_table_name)

# idのリストをテンポラリテーブルに一時的に書き出す
def create_tmp(tmp_file, table_name, id_list):
    # まずストレージに書き出す
    output_ids(tmp_file, id_list)

    # ストレージからテーブルを作成する
    bq_client = bigquery.Client(PROJECT_NAME)
    dataset_ref = bq_client.dataset(BQ_DATASET_NAME)
    schema = [
        bigquery.SchemaField('id', 'INTEGER'),
    ]
    table = bigquery.Table(dataset_ref.table(table_name), schema=schema)
    external_config = bigquery.ExternalConfig('CSV')
    external_config.source_uris = [
        'gs://' + BUCKET_NAME + '/' + tmp_file
    ]
    table.external_data_configuration = external_config
    bq_client.create_table(table)
    bq_client.close()

# tmpテーブルと照らし合わせてデータを変換する
def convert_and_output(full_tmp_table_name, full_table_name, table_name, convert_type):
    # データ変換してテーブルとして保存させる
    # select文でtmpテーブルとjoinしてデータを変換するクエリを書く
    query = 'create table ' + full_table_name + """
        as (
            select """ + convert_type + """ from fugafuga
            inner join """ + full_tmp_table_name + """ as tmp
            on ...
        )
    """
    client = bigquery.Client()
    client.query(query).result()

    # テーブルに保存したものをcsvファイルとしてGCSに出力する
    # GCS内のファイルパスを組み立てる ここはディレクトリ構造によって変わります
    destination_uri = "gs://{}/{}/{}".format(BUCKET_NAME, convert_type, table_name + '.csv')
    dataset_ref = bigquery.DatasetReference(PROJECT_NAME, BQ_DATASET_NAME)
    table_ref = dataset_ref.table(table_name)
    config = bigquery.ExtractJobConfig()
    config.print_header = False

    extract_job = client.extract_table(
        source=table_ref,
        destination_uris=destination_uri,
        location='US',
        job_config=config,
    )
    extract_job.result()

    client.close()

# テンポラリテーブルとファイルの削除
def delete_tmp(object_name, tmp_table_name, table_name):
    bq_client = bigquery.Client()
    query = 'drop table ' + table_name
    bq_client.query(query).result()

    query = 'drop table ' + tmp_table_name
    bq_client.query(query).result()
    bq_client.close()

    gcs_client = storage.Client(PROJECT_NAME)
    bucket = gcs_client.get_bucket(BUCKET_NAME)
    blob = storage.Blob(object_name, bucket)
    blob.delete()

Cloud Buildを使ってCloud Runにデプロイ&ビルド

Executeの実装ファイルがあるディレクトリで下記コマンドを実行し、コンテナを作成します。
実行時のオプションは他にもあるので、こちらを参照してください。

自分のローカルから実行する場合はgcloudコマンドをインストールしましょう。

gcloud builds submit --tag gcr.io/<GCPプロジェクト名>/<任意のサービス名>

作成したコンテナを使ってCloud Runにデプロイ&ビルドします。
メモリやリージョンの指定は皆様の環境に合わせて変えてください。実行時のオプションは他にもあるので、こちらを参照してください。

gcloud run deploy --image gcr.io/<GCPプロジェクト名>/<任意のサービス名> --platform managed --memory 2.0G --region us-central1

Cloud Runでの起動時のサービス名を聞かれるので、特にこだわりがなければそのままエンターを入力します。
最後に未認証のアクセスを許可するか聞かれる場合があるので、許可する方を選びます。あとからGCPコンソールで変更できます。

デプロイが完了したら下記のような表記がコマンドライン上に表示されるので、最後にかかれているURL部分をListの実装にある url に記載し、Cloud Functionを再度デプロイします。

Deploying container to Cloud Run service [<サービス名>] in project [<GCPプロジェクト名>] region [us-central1]
✓ Deploying new service... Done.
  ✓ Creating Revision...
  ✓ Routing traffic...
Done.

Service [<サービス名>] revision [<ビルドバージョン名>] has been deployed and is serving 100 percent of traffic at https://<サービスごとにユニークなやつ>.a.run.app

またGCPコンソールにてこのCloud Runのサービスの詳細を開くと「URL」としてリクエストを投げるエンドポイントのURLが得られるので、コマンドラインに出てきたURLを控えそこねた場合はそちらから拾ってもOKです。


つくったアクションをAction Hubに追加する

手順2で作成したListのCloud Function関数の、「トリガー」タブにあるURLを拾ってきます。

Lookerの「管理」->「Platform」->「Actions」からAction Hubを開きます。
一番下までいくと、「Add Action Hub」って書いたボタンがるのでそこを押して、表示されるフォームに先程拾ってきた「List」のURLを入力します。
add_action_hub_form.png

登録に失敗したら下記のようにめっちゃ怒られます。
add_failed.png
失敗するときは手順2, 3のCloud Functionのレスポンスjsonが狂っているとか、そもそもレスポンスを返せていないとかが原因なので、きれいな眼で自分の実装と設定を見直しましょう。
Executeの中の実装がバグっていてもLookerはチェックしてくれない(というかそこまではできない)ので、この段階では実装を書いた自分を信じて、ListとFormだけを見直しましょう。

成功したら下記のように「Enabled」と出ます。
add_success.png


動かしてみて喜ぶ

ここまでくれば、Exploreでフィルタ指定してID抽出、アクションポチっとすれば、GCSにファイルが保存されているはずです。
Exploreでポチポチして、動いた、わーいって喜びましょう。

動かない場合はExecuteの実行が失敗したよとLookerからメールが届くと思うので、手順4のExecuteの実装を見直しましょう。
またはアクションで投げたリクエストのサイズがCloud Runの上限(32MB)を超えている可能性があるので、少し小さいリクエストで検証するなどしましょう。


まとめ

最後まで読んでいただいてありがとうございます。
1つの記事に書くボリュームじゃないな?と自分で反省しています。本当に全部読んでいただいた方ありがとうございます!

ということで、今回はLookerをDMPツールとして使うべく、GCPを使ってカスタムアクションを実装しました。
今回はGCSに保存しましたが、実装を書き換えれば別の出力先にも対応出来ますし、保存と同じタイミングでCloud Pub/Subに「ファイル吐き出したよ!」みたいなメッセージを登録させて、さらにデータパイプラインを構築したりもできます。
LookerもGCPの仲間なので全部Google世界でできてしまうのですね。巨人すごい。

ということでトテモトテモ長い記事になりましたが、同じような感じで大量のデータを扱いたいとか、Lookerでデータ抽出したあとにデータ変換して色々したいっていう人の参考になれば幸いです。

それでは良いクリスマスを!ハッピーホリデー! :snowman::apple::gift::evergreen_tree::santa::evergreen_tree::gift::apple::snowman:

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?