きっかけ
BigQuery(BQ)は、大規模データに対してSQLクエリを実行できる強力なツールです。
Table形式のデータやGoogle Cloud Storage(GCS)に保存されたデータに対して、効率的なクエリ処理が可能です。
そんな便利なBQを使っている時、業務上他のAerospike,ElasticSearchなどに保存されているデータもクエリしたいと思うことがしばしば
楽に繋げないかなと調べるとユーザ関数からCloud Runをクエリで使えるのを見つけました。
https://cloud.google.com/functions/docs/tutorials/bigquery?hl=ja
データ取り込みの課題
ファイルデータのBQへの取り込みや外部テーブルとしての処理は比較的容易です。
mysqlは、すでに接続など整備されています。
https://cloud.google.com/bigquery/docs/connect-to-sql?hl=ja
いままでは、データを取得するために専用のコードを書く必要があり、取得したいデータが増えるたびに新しいコードを作成する必要がありました。
業務をしていると BQ に log が溜まってきます。
アクセスしてきたユーザの形跡は BQ にあるけど、解析したデータは AS や ES に入っている。
これをクエリできたら Batch を定期的回さなくて済みます
BQからデータベースへの接続方法
BQから直接データベースに接続する方法は現在提供されていませんが、Cloud Runを利用することで解決できます。
ユーザー関数の活用
BQのユーザー関数を使用することで、特定の処理を定義し、Cloud Runにリクエストを送信することができます。
認証
BQ上だと外部接続という名前で存在します。
これは、ユーザ関数がCloudRunを動作させる権限を持たせます。
Cloud Runの役割
Cloud Runはリクエストを受け取り、適切なレスポンスを返します
データベースへのアクセスコードをAPI化することで、BQ上のクエリから外部データベースへの接続が可能になります。
取得方法
外部データベースからデータを取得する手順は以下の通りです:
- BQユーザー関数の作成:外部データベースへのアクセス用の関数を定義します
- Cloud Run APIの実装:データベース接続とデータ取得のためのエンドポイントを構築します
- 認証の設定:BQがCloud Runと通信できるようにします
実装例
ここでは user_id をBQから渡します
CREATE FUNCTION `project.dataset.get_user_data`(user_id STRING)
RETURNS JSON
REMOTE WITH CONNECTION `project.us.connection`
OPTIONS (endpoint = 'https://your-cloud-run-url/get-user-data')
この関数を使用することで、SQLクエリ内で外部データベースのデータを直接参照できます:
SELECT
user_id,
`project.dataset.get_user_data`(user_id) as user_data
FROM `project.dataset.users`
注意点
- パフォーマンス:大規模データ処理にはバッチ処理の採用を検討してください
- コスト管理:Cloud RunとBQの利用コストを考慮した設計を行ってください
- 例外処理:データベース接続エラーやタイムアウトに適切に対応してください
APIの実装
import os
import sys
import signal
from types import FrameType
from flask import Flask, request, jsonify
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
client = None
@app.route("/", methods=["POST"])
def hogehoge():
try:
# リクエストの形式: https://cloud.google.com/bigquery/docs/remote-functions?hl=ja
data = request.get_json()
calls = data.get("calls", [])
user_defined_context = data.get("userDefinedContext", {})
return jsonify({"replies": ["data-dayo"]}), 200
except Exception as e:
logger.error(e)
return jsonify({"errorMessage": str(e)}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
上記は基本的な実装例です。
calls
入力データのバッチです。JSON配列形式で提供され、各要素はリモート関数呼び出しの引数リストを表すJSON配列となっています。
BQで指定したデータ(user_id)がこの変数に格納され、これを基にデータベースからの取得を行います。
userDefinedContext
BigQueryでリモート関数を作成する際に使用する、ユーザー定義のコンテキストです。
Key-Value形式のJSONオブジェクトで、省略可能です。
関数側で定義していませんが、固定値の受け渡しに使用できます。
例えば、データベースの全テーブルにアクセス可能な関数を作成した場合、その関数を使用できるユーザーが間接的にアクセス権を得てしまう問題があります。
このコンテキストを使用することで、ユーザー関数がアクセスできるテーブルを事前に制限することができます。
https://cloud.google.com/bigquery/docs/remote-functions?hl=ja#input_format
制限事項
https://cloud.google.com/bigquery/quotas?hl=ja#remote_function_limits
HTTP レスポンス サイズの上限(Cloud Run 関数第 2 世代または Cloud Run)
Cloud Run 関数第 2 世代または Cloud Run からの HTTP レスポンス本文の上限は 15 MB です。この値を超えると、クエリが失敗します。
BQからの1回のリクエストで15MBを超えるとエラーとなります。
制限の回避方法
https://cloud.google.com/bigquery/docs/remote-functions?hl=ja#limiting_number_of_rows_in_a_batch_request
BQからCloud Runへのリクエストを分割することが可能です。
OPTIONS (endpoint = 'ほげほげ', max_batching_rows = 1000);
max_batching_rowsを設定することでBQからのリクエストを分割できますが、これは完全な解決策ではありません。
設定した行数でも15MBを超える場合は同様にエラーとなります。
その場合は、クエリの見直しや別途バッチ処理の作成を検討してください。
まとめ
BigQueryと外部データベースの接続は、Cloud Runを活用することで実現可能です。
以下が重要なポイントです
- BQユーザー関数とCloud Run でのAPIを組み合わせることで、SQLクエリから外部データベースへのアクセスが可能になります
- 実装時は認証、パフォーマンス、コスト管理を適切に考慮する必要があります
- 15MBのレスポンスサイズ制限があるため、大規模データ処理には適切な対策が必要です!
この接続方法を活用することで、データ分析の効率化や業務の自動化が実現でき、より柔軟なデータ活用が可能になります。ただし、システムの要件や制限を考慮した適切な設計が重要です。