gbq で BigQuery からデータを読み込む
pandas-gbq を用いると、クエリを与えることでデータフレームを直接作成することができます。
shell
$ pip install pandas pandas-gbq
python
import pandas as pd
from pandas.io import gbq
query = "SELECT id, company, name FROM users"
df = pd.read_gbq(query, "project-id")
データが少ない間はこれで良いですが、データが増えた時を考えると、メモリの圧迫や、データフレームに処理を加えるときの動作が重くなることが考えられます。
1レコードずつ読み込んで、ある程度の大きさでデータフレームに変換する
上記の問題を解決するための関数を書きました。
shell
$ pip install pandas google-cloud-bigquery
python
import pandas as pd
from google.cloud import bigquery
# job: query job
# columns: カラムのリスト
# group: 指定したカラムは、同一値ならば同一データフレームに保存される
def data_frame_iteration(job, columns, group = None):
chunk = 100000 # 大体のレコード数
records = []
last = None
# レコードを1件ずつ読み込む
for record in job:
item = [record[column] for column in columns]
if group is None:
if len(records) >= chunk:
yield pd.DataFrame(records, columns = columns)
records = []
else:
# 指定したカラムの値が変わったタイミングで、レコードが多ければ (chunkの9割) 一旦保存。
if record[group] != last and len(records) > chunk * 0.9:
yield pd.DataFrame(records, columns = columns)
records = []
records.append(item)
if group is not None:
# 指定したカラムの値を保存しておく
last = record[group]
# 残りを保存
if len(records) > 0:
yield pd.DataFrame(records, columns = columns)
関数に group 引数を与えない場合
client = bigquery.Client("project-id")
query = "SELECT id, company, name FROM users"
columns = ["id", "columns", "name"]
job = client.query(query)
for df in data_frame_iteration(job, columns):
pass
各 df に、関数内の chunk 数単位で分割されたレコードが入ります。
関数に group 引数を与える場合
同一 company は同一データフレームに入れて操作したいという場合は次のようにします。
client = bigquery.Client("project-id")
query = "SELECT id, company, name FROM users ORDER BY company"
columns = ["id", "columns", "name"]
job = client.query(query)
for df in data_frame_iteration(job, columns, "company"):
pass
クエリ側で、グルーピングしたいカラムでソートしてある点に注意してください。
この場合、例えば company の値がともに 1 であるデータは、同一の df に入ります。複数の df にまたがることはありません。