はじめに
Pythonのpandas.DataFrameを使ったBigQueryの読み書きです。
Pythonのコーディングだけでなく、GCPの設定も書いてある点が、この記事の価値です。
ポイント
- GCPのサービスアカウントに必要な権限を調べた
- 認証を含めた、最小限のサンプルコードを作った
- 皆さんおなじみのpandasのDataFrameを使った
目次
- 準備
- コーディング
準備
(1) Python環境
pip install pandas-gbq google-auth
(2) GCP環境
- カスタムロールを定義
- サービスアカウントを作成
- IAMにサービスアカウントをプリンシパルとして追加し、カスタムロールを設定
- サービスアカウントの鍵を作成し、jsonファイルを入手
- "credentials"フォルダを作成し、"credentials/"を
.gitignore
へ追記し、フォルダ内に鍵のjsonファイルを置き、git status
に含まれないことを確認
- GCPロールの権限
- 付与したのは下記
- bigquery.datasets.get
- bigquery.jobs.create
- bigquery.tables.create
- 今回のサンプルはinsertだが、create tableもできるから。
- bigquery.tables.get
- テーブルのメタ情報の取得権限。
- insert時に型チェックをするから。
- bigquery.tables.getData
- bigquery.tables.updateData
- 参考
- 付与したのは下記
- 認証情報の扱い
- 手順5.はとても重要。この一連の内容は必ず一気にやる。
- フォルダを作るだけだと
git status
に出ないので、安心しないように注意。 - これが外部(publicなgithubとか)に出ると、勝手にアクセスできてしまう。
- privateすら危ないので基本的に含めない方がよい。githubのソースを使った能力測定みたいなサービスにうっかり登録し、そこから漏洩する可能性まである。そうなると個人でガードできない。
- 一度
git commit
したら、後で削除しても、履歴をたどって復活できる点にも注意。 - 万が一公開してしまった場合の対応策は、該当のサービスアカウントを消して、別のサービスアカウントを作る。
- 当然だが、公開中にダウンロードされたデータは消せない。
コーディング
短いので、全文載せます。(githubのURLを載せてる意味が...)
(1) 認証
(2) 読む(select)
(3) 書く(insert)
(1) 認証
"""
サービスアカウントを使った認証
"""
from google.oauth2 import service_account
KEY_PATH = "./credentials/sa-credentials.json"
def authenticate_with_sa()->service_account.Credentials:
# 参考
# https://cloud.google.com/bigquery/docs/authentication/service-account-file
credentials = service_account.Credentials.from_service_account_file(
KEY_PATH,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
return credentials
- ポイント
- 準備で作ったファイルを
google.oauth2.service_account.Credentials.Credentials.from_service_account_file
へ渡すだけ。
- 準備で作ったファイルを
- 参考
(2) 読む(select)
"""
BigQueryから情報を得る
"""
import os
import pandas as pd
from authenticate_with_sa import authenticate_with_sa
DATA_DIR = './data'
def get_from_bigquery():
"""get_from_bigquery
BigQueryから情報を取得
"""
# 認証
cred = authenticate_with_sa()
# クエリ
proj_id = 'tmp-20220501'
query = "SELECT * FROM `tmp-20220501.test_dataset1.owid-covid-data`"
df = pd.read_gbq(query, project_id=proj_id, credentials=cred)
print(df.head())
# CSV保存
os.makedirs(DATA_DIR, exist_ok=True)
df.to_csv(f'{DATA_DIR}/owid-covid-data.csv', header=True, index=False)
if __name__=='__main__':
get_from_bigquery()
- ポイント
- 認証後に
pandas.read_gbq()
を呼ぶだけ。
- 認証後に
- 参考
(3) 書く(insert)
"""
BigQueryへInsert
"""
import os
import pandas as pd
from authenticate_with_sa import authenticate_with_sa
DATA_DIR = './data'
def put_to_bigquery():
"""put_to_bigquery
BigQueryへinsert
※ テーブルは存在している前提
"""
# 認証
cred = authenticate_with_sa()
# 元データ
# 型: 整数, 文字列, Float, 日時(タイムスタンプ)
df = pd.read_csv(f'{DATA_DIR}/test_table1_data.csv', encoding='utf-8')
# 日時は明示的にdatetimeにしないと、to_gbq()の内部でintにしようとして、下記のエラーになる
# pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int
df['col4'] = pd.to_datetime(df['col4'])
#print(df.dtypes)
#print(df.head())
# Insert
dest_table = 'test_dataset1.test_table1'
tab_schema = [
{'name': 'col1', 'type': 'INTEGER'},
{'name': 'col2', 'type': 'STRING'},
{'name': 'col3', 'type': 'FLOAT'},
{'name': 'col4', 'type': 'TIMESTAMP'},
]
proj_id = 'tmp-20220501'
df.to_gbq(
destination_table=dest_table,
if_exists='append',
table_schema=tab_schema,
project_id=proj_id,
credentials=cred
)
if __name__=='__main__':
put_to_bigquery()
- ポイント
- TIMESTAMP等の日時の項目を扱うときは、Dataframeの時点で、
pd.to_datetime()
で日時にしておく必要がある。 - 列と型を定義する配列を準備しておく。関数的には任意パラメータだけど、実質は必須なんじゃないかと思われる。
- TIMESTAMP等の日時の項目を扱うときは、Dataframeの時点で、
- 参考
おわりに
コーディング的には、あまり難しいことはない。データサイエンティスト御用達のDataFrameを使えるのは、割とうれしい。
けど、GCPのサービスアカウントとか権限周りをやったことがないとイラっとするかもしれない。特にどの権限が必要なのかは、ちょっとめんどくさい。
再:Credentialsには注意
しつこいと自覚があって何度も書くけど、サービスアカウントの鍵(jsonファイル)の扱いは注意。会社のデータが全部流出するとかのレベルの大事故になりうる。
なんらかのシステムで使う場合は、jsonファイルがシステムから流出してしまうリスクを避けるために、jsonファイルを使わず、pythonを動かすプロセスの一時的な環境変数をクライアントから設定する(サーバーに認証情報を置かない)。その場合は、pythonではos.getenv()
で取った値を使って、service_account.Credentials.from_service_account_info()
でCredentials
インスタンスを作る。