3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

[Python]BigQueryにクエリを実行する

Last updated at Posted at 2022-09-24

はじめに

n番煎じかもしれませんが、PythonでBigQueryにクエリを実行するクラスを実装したので載せてみました。
色々と雑ですが少しでも参考になれば。

目次

  • 準備
  • コード
  • まとめ
  • 最後に

準備

bigqueryのライブラリを使用するには次の準備が必要です。

ライブラリのインストール

ターミナルからpipコマンドでライブラリをインストールします。

pip install --upgrade google-cloud-bigquery
pip install --upgrade db-dtypes

クレデンシャルの取得

bigqueryライブラリの使用には、サービスアカウントのクレデンシャルが必要です。
クレデンシャル取得方法は下記の通り。
*すでにBigQueryの権限を持っているサービスアカウントを発行済みの場合は4までスキップ

  1. GCPにログイン
    https://console.cloud.google.com/welcome

  2. 「IAMと管理」→「サービスアカウント」をクリック

  3. 「サービスアカウントを作成」
    必要に応じて権限を付与してください。
    今回は「bigquery管理者」のロールを付与します。

  4. 作成したサービスアカウントをクリックし、詳細を開く。
    詳細を開いたら、下記の通りにクレデンシャル情報をダウンロードする。

    1. 「キー」をクリック
    2. 「鍵を追加」→「新しい鍵を作成」をクリック
    3. 「JSON」を選択し、「作成」をクリック
    4. 自動でJSON形式のクレデンシャル情報が保存されます。
  5. 任意の場所にクレデンシャルファイルは保存します。

コード

BigQueryConnectionクラスの実装

Python bq.py
import os 
import pandas as pd
from typing import Tuple, List 
from more_itertools import chunked

from google.cloud import bigquery
from google.cloud.exceptions import NotFound


class BigQueryConnection:
    def __init__( self, credential_path):
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credential_path
        self.client = bigquery.Client()
    
    #select,delete,updateなどのDMLを実行する。実行結果をdfで返す
    def bq_dml_execute( self, dml_statement):
        job = self.client.query(dml_statement)
        df = job.to_dataframe()
        print("query executed")
        return df


    #DFで受け取ったデータをテーブルにインサートする
    def bq_insert( self, table_id, df):
        #tableの情報を取得する
        table = self.client.get_table(table_id)
        #インサートするデータを取得する
        rows = len(df)
        
        #データが10000行以上の場合、10000行ごとにインサートを実行する
        if rows > 10000:
            count = 0
            while rows > 0:
                job = self.client.load_table_from_dataframe(
                    df[(count * 10000):((count + 1 ) * 10000)],
                    table,
                    job_config = bigquery.job.LoadJobConfig(schema = table.schema))
                rows -= 10000
                count += 1
        else:
            job = self.client.load_table_from_dataframe(
                df,
                table,
                job_config = bigquery.job.LoadJobConfig(schema = table.schema))
    #上記のインサートでエラーが起こる場合、ストリーミングインサートを使用する。
    def bq_streaming_insert( self, table_id, df):
        #tableの情報を取得する
        table = self.client.get_table(table_id) 
        #dfをtupleに変換
        rows = [tuple(x) for x in df.values]
        for chunked_rows in list(chunked(rows, 10000)):
            job = self.client.insert_rows(table, chunked_rows)
    
    def bq_drop_table( self, table_id):
        job = self.client.delete_table(table_id, not_found_ok = True)
    
    
    def bq_create_table( self, table_id, schema):
        #schema = [
        #          bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
        #          bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
        #          ]
        
        #テーブルの存在確認
        Flag = self.bq_table_exists( table_id)
        
        if not Flag :
            table = bigquery.Table(table_id, schema = schema)
            job = self.client.create_table(table)
        else :
            print(f'{table_id} already exists')
        
    def bq_table_exists( self, table_id):
        try:
            self.client.get_table(table_id)
            return True
        except NotFound:
            return False          

上記クラスをインポートし、実際に使ってみると次のようになります。

クラスを使用する際の準備:

#必要なモジュールをインポートする
import pandas as pd
from google.cloud import bigquery

#作成したクラスをインポートする
from bq import BigQueryConnection as BQ

#BQクラスを起動する
bq_test = BQ("./credential.json") #クレデンシャルのパスを記入

#変数を宣言する
project = "プロジェクト名"
dataset = "データセット名"
table_name = "テーブル名"

table_id = f"{project}.{dataset}.{table_name}"

CREATE TABLE:

Python create.py
#スチーマを設定する
schema = [
          bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
          bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
          ]

bq_test.bq_create_table(table_id, schema)

DROP TABLE:

Python drop.py
bq_test.bq_drop_table( table_id)

SELECT(DML文):

Python dml.py
#クエリを設定する
query = "SELECT *"\
    f"FROM {table_id};"

#クエリを実行し、dfで結果を返す
df = bq_test.bq_dml_execute( query)
print(df.head())

INSERT:

Python insert.py
#インサートするデータを準備する(今回はcsvファイル)
data = pd.read_csv("test_data.csv")
bq_test.bq_insert(table_id, data)

最後に

githubに潤沢なサンプルコードがありますのでそちらを参考にした方が早いかもです。
https://github.com/googleapis/python-bigquery/blob/HEAD/samples/create_table.py

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?