PythonでBigQueryを扱う方法の詳細は下記の公式サイトを確認してください。
https://googleapis.dev/python/bigquery/latest/usage/index.html
データセットを操作したい場合は上記サイトのManaging Datasets
テーブル操作したい場合はManaging Tables
に記載があります。
テーブルの作成
テーブル作成後にすぐにデータを入れようとすると実行されないことがあるので注意。
from google.cloud import bigquery
# Colabで使う場合はPJ名の指定が必要
client = bigquery.Client()
client = bigquery.Client(project=project_id) # "your-project"
schema = [
bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
]
# table_id = "your-project.your_dataset.your_table_name"
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table) # Make an API request.
print(
"Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)
テーブル作成時のスキーマ指定方法
ステートメントの詳細についてはこちらに記載あり、スキーマの指定方法についてはBigQueryの公式サイトに詳細が書かれているので、
必要に応じてfield_typeやmodeを変更していきます。
SchemaField(name, field_type, mode='', description=None, fields=(), policy_tags=None)
# 文字列(空白あり)
SchemaField(name, 'STRING', mode='NULLABLE')
# 整数 (空白あり)
SchemaField(name, 'INT64', mode='NULLABLE')
# 浮動小数点(空白あり)
SchemaField(name, 'FLOAT64', mode='NULLABLE')
# 日付(必須)
SchemaField(name, 'DATE', mode='REQUIRED')
# 日時(必須)
SchemaField(name, 'DATETIME', mode='REQUIRED')
queryを実行する方法
公式サイトはこちら
https://googleapis.dev/python/bigquery/latest/usage/queries.html
from google.cloud import bigquery
# Colabで使う場合はPJ名の指定が必要
client = bigquery.Client()
client = bigquery.Client(project=project_id) # "your-project"
# 実行したいqueryを記載
query = '''
select * from `tableID`
where ...
'''
result = client.query(query)
# データフレームにする場合
df = result.to_dataframe()
# for文でも取り出し可能
for row in result:
# 処理したい内容
insertする方法
一度にinsertできる行数は10,000までなので注意してください。
10,000行より多いデータを入れたい場合は後述の方法で対応可能です。
from google.cloud import bigquery
# Colabで使う場合はPJ名の指定が必要
client = bigquery.Client()
client = bigquery.Client(project=project_id) # "your-project"
# table_id = "your-project.your_dataset.your_table_name"
table = client.get_table(table_id) # Make an API request.
# リストの二次元配列
# 例ではタプルになっているがリストでも問題なし(下記はカラムが2つの場合)
# ただしスキーマの数と中のタプル(or リスト)の要素の数が異なるとエラーになるので注意
rows_to_insert = [("string", num), ("string", num)]
errors = client.insert_rows(table, rows_to_insert) # Make an API request.
# pandasのdataflameを使う場合は
errors = client.insert_rows(table, df.values.tolist())
if errors == []:
print("New rows have been added.")
dfにはdataflameが入っている想定で下記でリストの二次元配列へ変更が可能
df.values.tolist()
また、10,000行より大きいデータをinsertしたい場合は下記のように分割してあげてあげましょう。
rows = len(df)
count = 0
while rows > 0:
client.insert_rows(table, df[(count * 10000): ((count+1) * 10000)].values.tolist())
rows = rows - 10000
count = count + 1
もっと良い書き方があるかもしれませんが、上記で10000より大きくても全て追加することが可能です。