LoginSignup
5
2

More than 3 years have passed since last update.

Google Cloud APIを触ってごにょごにょしてみる(Python編)

Posted at

概要

Google Cloud APIを使ってアレをナニする必要性に迫られたのでメモ。

何する?

  • BigQuery上でのクエリ結果を一時テーブルに保存する
  • 一時テーブルの中身をCloud Storageに保存する(JSON形式)

前提条件

  • GCPのサービスアカウントに以下の権限が与えられていること
    • BigQuery データ編集者
    • BigQuery ジョブユーザー
    • ストレージのオブジェクト管理者
  • サービスアカウントキーの環境変数が設定されていること
export GOOGLE_APPLICATION_CREDENTIALS="/path_to_key/project-xxxxxx.json"

仕様

  • CUI
  • 引数からGCP関連情報(プロジェクト、データセット、テーブル名、バケット名)を取得する。
  • BigQueryのクエリを実行し、結果を一時テーブル(tmp_table)に保存する。
  • 一時テーブルの内容をJSON形式(改行区切り)でCloud Storageへ保存する。
    • 保存先は gs://backet-name/results.json
    • ファイル名はresult.jsonで固定とする。

実行手順

cd /path_to_source/
python filename.py -p "gcp-project-name" -d "bq-dataset-name" -t "bq-table-name" -b "gcs-backet-name"

パラメータ

-p : GCPのプロジェクト名
-d : BigQueryのデータセット名
-t : BigQueryのテーブル名
-b : Cloud Storageのバケット名

コードは以下。

#!/usr/bin/env python
# -*- coding: utf-8 -*

import argparse
from google.cloud import bigquery
from datetime import datetime

def parse():
    usage = 'Usage: python {} -p [project] -d [dataset] -t [table] -b [backet] \n \
            example: python {} -p "gcp-project-name" -d "bq-dataset-name" -t "bq-table-name" -b "gcs-backet-name"' \
            .format(__file__, __file__)
    # パーサの作成
    psr = argparse.ArgumentParser(usage=usage)
    # for GCP
    psr.add_argument('-p', '--project', default = 'gcp-project-name')
    psr.add_argument('-d', '--dataset', default = 'bq-dataset-name')
    psr.add_argument('-t', '--table',   default = 'bq-table-name')
    psr.add_argument('-b', '--backet',  default = 'gcs-backet-name')
    args = psr.parse_args()
    print(args)
    return args

class IdcfGcpInfo:
    def __init__(self,
            project             = 'gcp-project-name',
            dataset_id          = 'bq-dataset-name',
            table_id            = 'bq-table-name',
            backet_name         = 'gcs-backet-name',
            json_file_name      = 'results.json',
            destination_uri     = 'gs://{}/{}',
            destination_format  = 'NEWLINE_DELIMITED_JSON',
            compression         = 'NONE',
            location            = 'asia-northeast1',
        ):
        self.project            = project
        self.dataset_id         = dataset_id
        self.table_id           = table_id
        self.backet_name        = backet_name
        self.json_file_name     = 'results.json'
        self.destination_uri    = 'gs://{}/{}'.format(self.backet_name, self.json_file_name)
        self.destination_format = destination_format
        self.compression        = compression
        self.location           = location

# BigQuery上で発行するクエリ
def query_on_bq(gcp_info):
    sql = ''' SELECT column1, column2, column3 FROM `{}.{}.{}`;'''.format(
            gcp_info.project, gcp_info.dataset_id, gcp_info.table_id
        )
    return sql

# BigQueryでのクエリ結果を一時テーブルに保存
def save_to_tmp_table_from_query(gcp_info):
    client = bigquery.Client()
    job_config = bigquery.QueryJobConfig()

    # クエリ用configの指定
    dataset_ref = client.dataset(
        dataset_id = gcp_info.dataset_id,
        project    = gcp_info.project
    )
    tmp_ref = dataset_ref.table('tmp_table')  # 一時テーブルをセット
    job_config.destination = tmp_ref  # 保存先に一時テーブルを指定
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE  # 実行前にTRUNCATEする
    sql = query_on_bq(gcp_info)  # sql取得

    # クエリ実行
    query_job = client.query(sql, job_config = job_config)
    query_job.result()  # Waits for job to complete.

# 一時テーブルの内容をCloud Storageに保存
def save_to_gcs_from_tmp_table(gcp_info):
    client = bigquery.Client()
    dataset_ref = client.dataset(
        dataset_id = gcp_info.dataset_id,
        project    = gcp_info.project
    )
    table_ref = dataset_ref.table(table_id = 'tmp_table')

    # configの指定
    ext_job_config = bigquery.ExtractJobConfig()
    ext_job_config.destination_format = gcp_info.destination_format
    ext_job_config.compression = gcp_info.compression

    # 指定したバケットに保存
    extract_job = client.extract_table(
        source           = table_ref,
        destination_uris = gcp_info.destination_uri,
        location         = gcp_info.location,
        job_config       = ext_job_config
    )
    extract_job.result()  # Waits for job to complete.

def main():
    args = parse()
    gcp_info = IdcfGcpInfo(
        project     = args.project,
        dataset_id  = args.dataset,
        table_id    = args.table,
        backet_name = args.backet
    )
    save_to_tmp_table_from_query(gcp_info)
    save_to_gcs_from_tmp_table(gcp_info)

if __name__ == '__main__':
    main()

注意事項

  • Cloud StorageのバケットはBigQueryと同じリージョンに配置すること!!
    :point_right_tone1: リージョンが異なるともれなくエラーを吐きます。
  • Google公式の方法だと、JSON保存形式のオプションに NEWLINE_DELIMITED_JSON しか指定できません。
    :point_right_tone1: デリミタがカンマではなく改行になっています。

こんな感じ

{"ip":"172.26.0.0","time":"2019-01-01 00:00:01","region":"west"}
{"ip":"127.0.0.1","time":"2019-01-01 00:30:02","region":"east"}
{"ip":"192.168.1.1","time":"2019-01-01 05:30:02","region":"east"}

カンマ区切りじゃなきゃヤダ!という場合は、他のエレガントな方法を考える必要があります。

Pythonあまり触ったことがないので、ツッコミどころがあればご指摘いただけると喜びます:snake:

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2