はじめに
n番煎じかもしれませんが、PythonでBigQueryにクエリを実行するクラスを実装したので載せてみました。
色々と雑ですが少しでも参考になれば。
目次
- 準備
- コード
- まとめ
- 最後に
準備
bigqueryのライブラリを使用するには次の準備が必要です。
ライブラリのインストール
ターミナルからpipコマンドでライブラリをインストールします。
pip install --upgrade google-cloud-bigquery
pip install --upgrade db-dtypes
クレデンシャルの取得
bigqueryライブラリの使用には、サービスアカウントのクレデンシャルが必要です。
クレデンシャル取得方法は下記の通り。
*すでにBigQueryの権限を持っているサービスアカウントを発行済みの場合は4までスキップ
-
「IAMと管理」→「サービスアカウント」をクリック
-
「サービスアカウントを作成」
必要に応じて権限を付与してください。
今回は「bigquery管理者」のロールを付与します。 -
作成したサービスアカウントをクリックし、詳細を開く。
詳細を開いたら、下記の通りにクレデンシャル情報をダウンロードする。- 「キー」をクリック
- 「鍵を追加」→「新しい鍵を作成」をクリック
- 「JSON」を選択し、「作成」をクリック
- 自動でJSON形式のクレデンシャル情報が保存されます。
-
任意の場所にクレデンシャルファイルは保存します。
コード
BigQueryConnectionクラスの実装
import os
import pandas as pd
from typing import Tuple, List
from more_itertools import chunked
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
class BigQueryConnection:
def __init__( self, credential_path):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credential_path
self.client = bigquery.Client()
#select,delete,updateなどのDMLを実行する。実行結果をdfで返す
def bq_dml_execute( self, dml_statement):
job = self.client.query(dml_statement)
df = job.to_dataframe()
print("query executed")
return df
#DFで受け取ったデータをテーブルにインサートする
def bq_insert( self, table_id, df):
#tableの情報を取得する
table = self.client.get_table(table_id)
#インサートするデータを取得する
rows = len(df)
#データが10000行以上の場合、10000行ごとにインサートを実行する
if rows > 10000:
count = 0
while rows > 0:
job = self.client.load_table_from_dataframe(
df[(count * 10000):((count + 1 ) * 10000)],
table,
job_config = bigquery.job.LoadJobConfig(schema = table.schema))
rows -= 10000
count += 1
else:
job = self.client.load_table_from_dataframe(
df,
table,
job_config = bigquery.job.LoadJobConfig(schema = table.schema))
#上記のインサートでエラーが起こる場合、ストリーミングインサートを使用する。
def bq_streaming_insert( self, table_id, df):
#tableの情報を取得する
table = self.client.get_table(table_id)
#dfをtupleに変換
rows = [tuple(x) for x in df.values]
for chunked_rows in list(chunked(rows, 10000)):
job = self.client.insert_rows(table, chunked_rows)
def bq_drop_table( self, table_id):
job = self.client.delete_table(table_id, not_found_ok = True)
def bq_create_table( self, table_id, schema):
#schema = [
# bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
# bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
# ]
#テーブルの存在確認
Flag = self.bq_table_exists( table_id)
if not Flag :
table = bigquery.Table(table_id, schema = schema)
job = self.client.create_table(table)
else :
print(f'{table_id} already exists')
def bq_table_exists( self, table_id):
try:
self.client.get_table(table_id)
return True
except NotFound:
return False
上記クラスをインポートし、実際に使ってみると次のようになります。
クラスを使用する際の準備:
#必要なモジュールをインポートする
import pandas as pd
from google.cloud import bigquery
#作成したクラスをインポートする
from bq import BigQueryConnection as BQ
#BQクラスを起動する
bq_test = BQ("./credential.json") #クレデンシャルのパスを記入
#変数を宣言する
project = "プロジェクト名"
dataset = "データセット名"
table_name = "テーブル名"
table_id = f"{project}.{dataset}.{table_name}"
CREATE TABLE:
#スチーマを設定する
schema = [
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
]
bq_test.bq_create_table(table_id, schema)
DROP TABLE:
bq_test.bq_drop_table( table_id)
SELECT(DML文):
#クエリを設定する
query = "SELECT *"\
f"FROM {table_id};"
#クエリを実行し、dfで結果を返す
df = bq_test.bq_dml_execute( query)
print(df.head())
INSERT:
#インサートするデータを準備する(今回はcsvファイル)
data = pd.read_csv("test_data.csv")
bq_test.bq_insert(table_id, data)
最後に
githubに潤沢なサンプルコードがありますのでそちらを参考にした方が早いかもです。
https://github.com/googleapis/python-bigquery/blob/HEAD/samples/create_table.py