0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

BigQuery上のテーブル・ビュー及びスキーマ更新をコード管理する

Last updated at Posted at 2020-05-09

BigQuery上のテーブルスキーマ更新は比較的簡単にできるのですが、ビューのスキーマ更新で少々躓きました。
いろいろと調べて、sqlとpython、ymlファイルで実装した内容をまとめています。

前置きが少々長いですが、興味がある方の参考になれば幸いです。

概要・前置き

以前しんゆうさんが「データ整備人(仮)」と呼んでいましたが、私自身社内で近しい業務を行っています。
https://analytics-and-intelligence.net/archives/5826

  • 社内のデータ抽出時のネックとして、テーブル構造が複雑で学習コストが高い
    例えば何か商品を購入した、というデータを出すのにテーブルをいくつも結合させる必要があったり。。

そこで、テンプレとなっている処理は、viewとしてまとめていくことになりました。

SQLに詳しい方だと処理スピードを考えて、テーブル化してバッチ処理すればいいのでは?と思われるかもしれません。
viewにした理由として、データの持たせ方が変わるかも、ということを想定していました。

この形で当分問題なさそう、という段階になればテーブル化は検討したいと思っています。

  • 今後はデータカタログを活用していきたい
    BQ上に存在するテーブル名や説明欄、スキーマ名やスキーマの説明欄を検索することができます。

データ抽出時のコスト要因の1つとして、テーブル定義書の確認があります。
BQ上にテーブルやスキーマの説明が記載されていれば、スイッチングコストも減り非常に楽になりそうです。

テーブル内のスキーマ更新は非常に簡単

CREATE文を利用時にオプションパラメータを指定することで、SQL文にてスキーマの説明内容を渡すことができます。
【BigQuery】CREATE文を使えるの知ってた?SQLでテーブル作成

似たような感じで、CREATE VIEWをすれば行けるのかなと最初は思っていたのですが、見事にエラーになりました。

そこで下記記事を参考に、pythonを利用してviewの作成及びスキーマを更新するコードを実装してみました。
[BigQuery] BigQueryのPython用APIの使い方 -テーブル作成編-

開発環境

  • windows10
  • python 3.8.2

事前の準備

  • スキーマ更新を行うため、サービスアカウントを発行し必要な権限を付与
    BigQueryのデータ編集者、閲覧者、ユーザー権限を付与しています。

  • モジュールのinstallが完了していない場合、下記を参照ください
    クイックスタート: クライアント ライブラリの使用

ファイル構成


┣ create_view.py
┣ upadte_schema.py
┣ key
┃ ┗ jsonキーファイル
┣ description
┃ ┣ import_yml.py
┃ ┗ schema_sample.yml
┗ sql
  ┗ create_view.sql

実装コード

viewなのかtableなのか迷走している部分があるのですが、よしなにリファクタしていただければ幸いです。

ファイル構成に記載した通り、スキーマの記載をymlファイル内で行っています。

参考にしたサイトと同様、スキーマ更新ファイルはpythonファイル内に指定しても良かったです。
将来的には、各サービス担当のエンジニアの方にスキーマの説明を更新してもらうことなどを見据え、
平易に修正できるymlファイルを採用しています。

create_view.sql
#StandardSQL
作成したいテーブルやビューの実行クエリを書いていただければ問題ないです

実行コマンドサンプル
python create_view.py #1.sql/実行したいSQLファイル #2.tableもしくはview #3.テーブルもしくはビュー名

create_view.py
from google.cloud import bigquery
from google.oauth2 import service_account
import sys
import codecs
# PC起動の度に環境変数を設定するのが手間なので、パスを指定して実行
credentials = service_account.Credentials.from_service_account_file(
    filename= './key/発行したサービスアカウントのJSONキーファイル',
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

args = sys.argv
# 読み込み用に開き、utfに変換しエラーの場合は文字型で渡す
file = codecs.open(args[1],  mode='r', encoding='utf-8', errors='strict') 
query = file.read()  

# TABLEはReplace可能だが、VIEWは不可能なので、既存のものがある場合は削除する処理を追加
drop_data ='DROP '+args[2]+' IF EXISTS `作成先のプロジェクト名.データセット名.'+args[3]+'`'+';'+'\n'

create_data ='CREATE '+args[2]+' IF NOT EXISTS'+'\n'+'`作成先のプロジェクト名.データセット名.'+args[3]+'`'+'\n' + 'AS' + '\n'+ query

def exec_query():
    client = bigquery.Client(
        credentials=credentials,
        project=credentials.project_id,
    )
    exec_job = client.query(
                drop_data + create_data
                )
    exec_job.result() 
if __name__ == '__main__':
    exec_query()
    file.close()
schema_sample.yml
name: テーブルもしくはビューの名称。view作成用のsqlと同じにした方が管理が楽だと思います
dataset: データセットの名称
desc: テーブルもしくはビューの説明を記載してください

desc: | 文章を途中で改行したい場合は、先頭に | を追加したこちらを利用
  https://www.task-notes.com/entry/20150922/1442890800
columns:
  - name: カラムの名称。
   クエリを書くときに使用するため、名前から分かりやすい名称が好ましいです。
    type: 型を記載してください。以下サンプルの対応表です。他の型を利用したい場合は下記を参照。
    https://cloud.google.com/bigquery/docs/reference/standard-sql/conversion_rules?hl=ja
    整数:INTEGER
    数値:FLOAT
    文字列:STRING
    日付:DATE
    時間:DATETIME
    description: カラムの中身に関する説明を記載してください。
    特定の条件だと、どんな数値が取れるのか?など記載いただけると良いと思います。
  - name: test
    type: STRING
    description: こんな感じで書けば大丈夫です
  • description配下のスキーマ説明ファイルを読み込みます。
import_yml.py
import yaml
import sys
import codecs

args = sys.argv
file_path = r'.\description'+'\\'+args[1]+'.yml'

with codecs.open(file_path, 'r', 'utf-8') as file:
    obj = yaml.safe_load(file)

schema_list = obj["columns"]

def schema():
    from google.cloud.bigquery import SchemaField
    res = []
    for i in range(len(schema_list)):
            input_schema = SchemaField(name=schema_list[i]['name'], field_type=schema_list[i]['type'], description=schema_list[i]['description'])
            res.append(input_schema)
    return res

実行コマンドサンプル
python upadte_schema.py #1.作成したymlファイル名

upadte_schema.py
from google.cloud import bigquery
from google.oauth2 import service_account
from description import import_yaml

# サービスアカウントのJSONキーファイルを利用する
# パスを通すのは手間なので、下記を利用する
credentials = service_account.Credentials.from_service_account_file(
    filename= './key/発行したサービスアカウントのJSONキーファイル',
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

project_id = "更新したいテーブル・ビューのProjectを指定"
client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

# 対象のテーブル・ビュー名
table_name = import_yaml.obj.get("name")
dataset_id = import_yaml.obj.get("dataset")
table_id = "{}.{}.{}".format(client.project, dataset_id, table_name)

# 以下、スキーマの説明更新
schema = import_yaml.schema()

view_schema = bigquery.Table(table_id, schema=schema)
client.update_table(view_schema, ["schema"])  # Make an API request.

# 以下、テーブル・ビュー本体の説明更新
table_ref = client.dataset(dataset_id).table(table_name)
view = client.get_table(table_ref) 
# 説明内容を読み込んで更新
view.description = import_yaml.obj.get("desc")
client.update_table(view, ["description"]) 

今後の展望

今は手動で更新を行っていますが、今後はCI/CDの仕組みも取り入れていきたいです。

スキーマ更新ですが、対象のテーブルやビューのカラムを全て記載する必要があります。
そのためテーブルの中身の更新後、スキーマ更新ファイルが実行される、というフローを構築していきたいです。

また何か分析を行う際に、このデータってどこにあるのか?この条件で抽出していいのか?という、サービス担当者への確認作業が発生したりします。
スキーマの充実によってデータカタログでの検索が容易になれば、分析者も自主的に作業ができ、お互いに本来やりたい業務に時間を割けるようになります。

社内で分析をしたいという方々が、必要な情報を、簡単に探せる環境づくりに繋がったらいいなと思いますし、
同じ悩みを抱えている方の参考になれば幸いです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?