BigQuery上のテーブルスキーマ更新は比較的簡単にできるのですが、ビューのスキーマ更新で少々躓きました。
いろいろと調べて、sqlとpython、ymlファイルで実装した内容をまとめています。
前置きが少々長いですが、興味がある方の参考になれば幸いです。
概要・前置き
以前しんゆうさんが「データ整備人(仮)」と呼んでいましたが、私自身社内で近しい業務を行っています。
https://analytics-and-intelligence.net/archives/5826
- 社内のデータ抽出時のネックとして、テーブル構造が複雑で学習コストが高い
例えば何か商品を購入した、というデータを出すのにテーブルをいくつも結合させる必要があったり。。
そこで、テンプレとなっている処理は、viewとしてまとめていくことになりました。
SQLに詳しい方だと処理スピードを考えて、テーブル化してバッチ処理すればいいのでは?と思われるかもしれません。
viewにした理由として、データの持たせ方が変わるかも、ということを想定していました。
この形で当分問題なさそう、という段階になればテーブル化は検討したいと思っています。
- 今後はデータカタログを活用していきたい
BQ上に存在するテーブル名や説明欄、スキーマ名やスキーマの説明欄を検索することができます。
データ抽出時のコスト要因の1つとして、テーブル定義書の確認があります。
BQ上にテーブルやスキーマの説明が記載されていれば、スイッチングコストも減り非常に楽になりそうです。
テーブル内のスキーマ更新は非常に簡単
CREATE文を利用時にオプションパラメータを指定することで、SQL文にてスキーマの説明内容を渡すことができます。
【BigQuery】CREATE文を使えるの知ってた?SQLでテーブル作成
似たような感じで、CREATE VIEWをすれば行けるのかなと最初は思っていたのですが、見事にエラーになりました。
そこで下記記事を参考に、pythonを利用してviewの作成及びスキーマを更新するコードを実装してみました。
[BigQuery] BigQueryのPython用APIの使い方 -テーブル作成編-
開発環境
- windows10
- python 3.8.2
事前の準備
-
スキーマ更新を行うため、サービスアカウントを発行し必要な権限を付与
BigQueryのデータ編集者、閲覧者、ユーザー権限を付与しています。 -
モジュールのinstallが完了していない場合、下記を参照ください
クイックスタート: クライアント ライブラリの使用
ファイル構成
・
┣ create_view.py
┣ upadte_schema.py
┣ key
┃ ┗ jsonキーファイル
┣ description
┃ ┣ import_yml.py
┃ ┗ schema_sample.yml
┗ sql
┗ create_view.sql
実装コード
viewなのかtableなのか迷走している部分があるのですが、よしなにリファクタしていただければ幸いです。
ファイル構成に記載した通り、スキーマの記載をymlファイル内で行っています。
参考にしたサイトと同様、スキーマ更新ファイルはpythonファイル内に指定しても良かったです。
将来的には、各サービス担当のエンジニアの方にスキーマの説明を更新してもらうことなどを見据え、
平易に修正できるymlファイルを採用しています。
#StandardSQL
作成したいテーブルやビューの実行クエリを書いていただければ問題ないです
実行コマンドサンプル
python create_view.py #1.sql/実行したいSQLファイル #2.tableもしくはview #3.テーブルもしくはビュー名
from google.cloud import bigquery
from google.oauth2 import service_account
import sys
import codecs
# PC起動の度に環境変数を設定するのが手間なので、パスを指定して実行
credentials = service_account.Credentials.from_service_account_file(
filename= './key/発行したサービスアカウントのJSONキーファイル',
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
args = sys.argv
# 読み込み用に開き、utfに変換しエラーの場合は文字型で渡す
file = codecs.open(args[1], mode='r', encoding='utf-8', errors='strict')
query = file.read()
# TABLEはReplace可能だが、VIEWは不可能なので、既存のものがある場合は削除する処理を追加
drop_data ='DROP '+args[2]+' IF EXISTS `作成先のプロジェクト名.データセット名.'+args[3]+'`'+';'+'\n'
create_data ='CREATE '+args[2]+' IF NOT EXISTS'+'\n'+'`作成先のプロジェクト名.データセット名.'+args[3]+'`'+'\n' + 'AS' + '\n'+ query
def exec_query():
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
exec_job = client.query(
drop_data + create_data
)
exec_job.result()
if __name__ == '__main__':
exec_query()
file.close()
- サンプルとして作成した、説明欄の内容を記載した ymlファイル
デフォルトでNULLABLEが設定されているため、modeの部分は除外しています
https://cloud.google.com/bigquery/docs/schemas?hl=ja#modes
name: テーブルもしくはビューの名称。view作成用のsqlと同じにした方が管理が楽だと思います
dataset: データセットの名称
desc: テーブルもしくはビューの説明を記載してください
desc: | 文章を途中で改行したい場合は、先頭に | を追加したこちらを利用
https://www.task-notes.com/entry/20150922/1442890800
columns:
- name: カラムの名称。
クエリを書くときに使用するため、名前から分かりやすい名称が好ましいです。
type: 型を記載してください。以下サンプルの対応表です。他の型を利用したい場合は下記を参照。
https://cloud.google.com/bigquery/docs/reference/standard-sql/conversion_rules?hl=ja
整数:INTEGER
数値:FLOAT
文字列:STRING
日付:DATE
時間:DATETIME
description: カラムの中身に関する説明を記載してください。
特定の条件だと、どんな数値が取れるのか?など記載いただけると良いと思います。
- name: test
type: STRING
description: こんな感じで書けば大丈夫です
- description配下のスキーマ説明ファイルを読み込みます。
import yaml
import sys
import codecs
args = sys.argv
file_path = r'.\description'+'\\'+args[1]+'.yml'
with codecs.open(file_path, 'r', 'utf-8') as file:
obj = yaml.safe_load(file)
schema_list = obj["columns"]
def schema():
from google.cloud.bigquery import SchemaField
res = []
for i in range(len(schema_list)):
input_schema = SchemaField(name=schema_list[i]['name'], field_type=schema_list[i]['type'], description=schema_list[i]['description'])
res.append(input_schema)
return res
実行コマンドサンプル
python upadte_schema.py #1.作成したymlファイル名
from google.cloud import bigquery
from google.oauth2 import service_account
from description import import_yaml
# サービスアカウントのJSONキーファイルを利用する
# パスを通すのは手間なので、下記を利用する
credentials = service_account.Credentials.from_service_account_file(
filename= './key/発行したサービスアカウントのJSONキーファイル',
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
project_id = "更新したいテーブル・ビューのProjectを指定"
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
# 対象のテーブル・ビュー名
table_name = import_yaml.obj.get("name")
dataset_id = import_yaml.obj.get("dataset")
table_id = "{}.{}.{}".format(client.project, dataset_id, table_name)
# 以下、スキーマの説明更新
schema = import_yaml.schema()
view_schema = bigquery.Table(table_id, schema=schema)
client.update_table(view_schema, ["schema"]) # Make an API request.
# 以下、テーブル・ビュー本体の説明更新
table_ref = client.dataset(dataset_id).table(table_name)
view = client.get_table(table_ref)
# 説明内容を読み込んで更新
view.description = import_yaml.obj.get("desc")
client.update_table(view, ["description"])
今後の展望
今は手動で更新を行っていますが、今後はCI/CDの仕組みも取り入れていきたいです。
スキーマ更新ですが、対象のテーブルやビューのカラムを全て記載する必要があります。
そのためテーブルの中身の更新後、スキーマ更新ファイルが実行される、というフローを構築していきたいです。
また何か分析を行う際に、このデータってどこにあるのか?この条件で抽出していいのか?という、サービス担当者への確認作業が発生したりします。
スキーマの充実によってデータカタログでの検索が容易になれば、分析者も自主的に作業ができ、お互いに本来やりたい業務に時間を割けるようになります。
社内で分析をしたいという方々が、必要な情報を、簡単に探せる環境づくりに繋がったらいいなと思いますし、
同じ悩みを抱えている方の参考になれば幸いです。