概要
Google Cloud APIを使ってアレをナニする必要性に迫られたのでメモ。
何する?
- BigQuery上でのクエリ結果を一時テーブルに保存する
- 一時テーブルの中身をCloud Storageに保存する(JSON形式)
前提条件
- GCPのサービスアカウントに以下の権限が与えられていること
- BigQuery データ編集者
- BigQuery ジョブユーザー
- ストレージのオブジェクト管理者
- サービスアカウントキーの環境変数が設定されていること
export GOOGLE_APPLICATION_CREDENTIALS="/path_to_key/project-xxxxxx.json"
仕様
- CUI
- 引数からGCP関連情報(プロジェクト、データセット、テーブル名、バケット名)を取得する。
- BigQueryのクエリを実行し、結果を一時テーブル(tmp_table)に保存する。
- 一時テーブルの内容をJSON形式(改行区切り)でCloud Storageへ保存する。
- 保存先は
gs://backet-name/results.json
- ファイル名は
result.json
で固定とする。
- 保存先は
実行手順
cd /path_to_source/
python filename.py -p "gcp-project-name" -d "bq-dataset-name" -t "bq-table-name" -b "gcs-backet-name"
パラメータ
-p : GCPのプロジェクト名
-d : BigQueryのデータセット名
-t : BigQueryのテーブル名
-b : Cloud Storageのバケット名
コードは以下。
#!/usr/bin/env python
# -*- coding: utf-8 -*
import argparse
from google.cloud import bigquery
from datetime import datetime
def parse():
usage = 'Usage: python {} -p [project] -d [dataset] -t [table] -b [backet] \n \
example: python {} -p "gcp-project-name" -d "bq-dataset-name" -t "bq-table-name" -b "gcs-backet-name"' \
.format(__file__, __file__)
# パーサの作成
psr = argparse.ArgumentParser(usage=usage)
# for GCP
psr.add_argument('-p', '--project', default = 'gcp-project-name')
psr.add_argument('-d', '--dataset', default = 'bq-dataset-name')
psr.add_argument('-t', '--table', default = 'bq-table-name')
psr.add_argument('-b', '--backet', default = 'gcs-backet-name')
args = psr.parse_args()
print(args)
return args
class IdcfGcpInfo:
def __init__(self,
project = 'gcp-project-name',
dataset_id = 'bq-dataset-name',
table_id = 'bq-table-name',
backet_name = 'gcs-backet-name',
json_file_name = 'results.json',
destination_uri = 'gs://{}/{}',
destination_format = 'NEWLINE_DELIMITED_JSON',
compression = 'NONE',
location = 'asia-northeast1',
):
self.project = project
self.dataset_id = dataset_id
self.table_id = table_id
self.backet_name = backet_name
self.json_file_name = 'results.json'
self.destination_uri = 'gs://{}/{}'.format(self.backet_name, self.json_file_name)
self.destination_format = destination_format
self.compression = compression
self.location = location
# BigQuery上で発行するクエリ
def query_on_bq(gcp_info):
sql = ''' SELECT column1, column2, column3 FROM `{}.{}.{}`;'''.format(
gcp_info.project, gcp_info.dataset_id, gcp_info.table_id
)
return sql
# BigQueryでのクエリ結果を一時テーブルに保存
def save_to_tmp_table_from_query(gcp_info):
client = bigquery.Client()
job_config = bigquery.QueryJobConfig()
# クエリ用configの指定
dataset_ref = client.dataset(
dataset_id = gcp_info.dataset_id,
project = gcp_info.project
)
tmp_ref = dataset_ref.table('tmp_table') # 一時テーブルをセット
job_config.destination = tmp_ref # 保存先に一時テーブルを指定
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE # 実行前にTRUNCATEする
sql = query_on_bq(gcp_info) # sql取得
# クエリ実行
query_job = client.query(sql, job_config = job_config)
query_job.result() # Waits for job to complete.
# 一時テーブルの内容をCloud Storageに保存
def save_to_gcs_from_tmp_table(gcp_info):
client = bigquery.Client()
dataset_ref = client.dataset(
dataset_id = gcp_info.dataset_id,
project = gcp_info.project
)
table_ref = dataset_ref.table(table_id = 'tmp_table')
# configの指定
ext_job_config = bigquery.ExtractJobConfig()
ext_job_config.destination_format = gcp_info.destination_format
ext_job_config.compression = gcp_info.compression
# 指定したバケットに保存
extract_job = client.extract_table(
source = table_ref,
destination_uris = gcp_info.destination_uri,
location = gcp_info.location,
job_config = ext_job_config
)
extract_job.result() # Waits for job to complete.
def main():
args = parse()
gcp_info = IdcfGcpInfo(
project = args.project,
dataset_id = args.dataset,
table_id = args.table,
backet_name = args.backet
)
save_to_tmp_table_from_query(gcp_info)
save_to_gcs_from_tmp_table(gcp_info)
if __name__ == '__main__':
main()
注意事項
- Cloud StorageのバケットはBigQueryと同じリージョンに配置すること!!
リージョンが異なるともれなくエラーを吐きます。 - Google公式の方法だと、JSON保存形式のオプションに
NEWLINE_DELIMITED_JSON
しか指定できません。
デリミタがカンマではなく改行になっています。
こんな感じ
{"ip":"172.26.0.0","time":"2019-01-01 00:00:01","region":"west"}
{"ip":"127.0.0.1","time":"2019-01-01 00:30:02","region":"east"}
{"ip":"192.168.1.1","time":"2019-01-01 05:30:02","region":"east"}
カンマ区切りじゃなきゃヤダ!という場合は、他のエレガントな方法を考える必要があります。
Pythonあまり触ったことがないので、ツッコミどころがあればご指摘いただけると喜びます