やりたかったことはスプレッドシート情報をBigQueryに上げて差分データの更新を行うということです。
困っていたこと
元々下記のフローで処理を行っていました。
しかしこれだと、新規テーブル作成後にすぐにデータを挿入(insert)してもデータが更新されないことが多々ありました。
SQLの実行を行うときはjob id
のステータスで実行完了しているかは見えるものの、
時差がかなり大きかったり、sleepを入れるとinsertできることもありましたが、
できないこともあり安定した方法を探していました。
結論
上記のフローを下記フローに変えることで解決しました。
テーブルの作成とデータのアップロードを別々で行っていたところを同時に行う処理に
変更したところ問題なく動きました。
聞けば当たり前ですが、この書き方をしらなかったので、いかにテーブル作成後に
「どうやって実行完了の把握をするか?」、「どうやって待機処理をするか?」という方法にばかり目がいっていました。
この記事で書いている内容
今回はスプレッドシートにで更新された情報の差分データのみを
反映させるという内容を書いています。
スプレッドシートのデータはこんな下記で用意しています。
1行目に日本語名、2行目にBigQuery用のカラム目、3行目にデータ型、
4行目以降に実際のデータが入っているというデータ構造です。
本当はPKの設定とかも入れたかったのですが、
今回はそこは趣旨としていれていないので、A列のIDが一致すればアップデート
一致しなければ更新するという流れにしました。
BigQueryのテーブルは下記のようにアップデートされます。
4つの実行方法
実践したのは下記の4つになります。
① SQLの実行でテーブル作成 ⇒ アップデート
② google-cloud-bigqueryでテーブル作成 ⇒ pandasからアップデート
③ pandas-gbqでテーブル作成・アップデート
④ google-cloud-bigqueryでテーブル作成・アップデート
①、②は元々考えていたフローでテーブル作成後にアップロードできないパターンです。
③と④はテーブル作成と同時にデータをアプロードしてうまくいったパターンです。
同じ悩みを抱えている方もいるかと思い、一応①、②も記載しました。
下記からコードの紹介です。
共通部分のコード
from google.colab import auth
auth.authenticate_user()
別の記事で認証コードなしで実行する方法も紹介していますが、
異なるPJのテーブルのデータとマージさせたりする場合は、
認証コードを毎回入れた方がよいと思います。
参考:Google Colaboratoryで認証コードなしで実行する方法
スプレッドシート操作、BigQuery操作は別記事で書いているので詳細は省きます。
Pythonでのスプレッドシート操作方法(gspreadの使い方)
PythonでBigQueryを操作する方法
!pip install gspread
from google.colab import auth
from oauth2client.client import GoogleCredentials
import gspread
# 認証処理
auth.authenticate_user()
gc = gspread.authorize(GoogleCredentials.get_application_default())
ss_id = `スプレッドシートID`
sht_name = 'シート名'
workbook = gc.open_by_key(ss_id)
worksheet = workbook.worksheet(sht_name)
values = worksheet.get_all_values()
values
# [['ID', '名前', '年齢', '好きなもの'],
# ['id', 'name', 'age', 'favorit'],
# ['int64', 'string', 'int64', 'string'],
# ['1', 'Bob', '25', 'apple'],
# ['2', 'Tom', '32', 'orange'],
# ['3', 'Jay', '28', 'grape']]
project_id = 'GCPのプロジェクトID'
dateset = 'データセット名'
table_name = 'テーブル名'
from google.cloud import bigquery
client = bigquery.Client(project=project_id)
ここまでは①~④で共有で使用する部分になります。
① SQLの実行でテーブル作成・アップデート
SQLの実行という表現が正しいのかわからないですが、
クエリエディタで操作するのと同じSQLで実行できる方法です。
メリットは書きやすいこと
とjob id
でステータスがわかることかなと思っています。
table_id = f'{project_id}.{dateset}.{table_name}'
query = f'create or replace table `{table_id}_temp`(\n'
for column, dtype in zip(values[1],values[2]):
query += f'{column} {dtype},\n'
query = query[:-2] + ')'
query
# create or replace table `{project_id}.{dateset}.{table_name}_temp`(
# id int64,
# name string,
# age int64,
# favorit string)
# 実行
result = client.query(query)
# ジョブの状態確認
client.get_job(result.job_id, location='asia-northeast1').state
# 'DONE'
ここまでのテーブルの作り方が①~③で違いますが、
以下の差分をアップデートするためのSQLは共通で使用します。
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1][1:]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
query
# merge into `{project_id}.{dateset}.{table_name}` as T
# using `{project_id}.{dateset}.{table_name}_temp` as S
# on T.id = S.id
# when matched then
# update set
# name = S.name,
# age = S.age,
# favorit = S.favorit
# when not matched then
# insert(id, name, age, favorit)
# values(id, name, age, favorit)
client.query(query)
すべて実行を終えて、BigQuery上でクエリ結果をみると
下記画像のように成功になっています。
ただしテーブルを見てみると、下記のようにからな状態になっています。
jobのステータスはDONE
なんだから反映されていてくれよ…。
サーバーの問題か5分ぐらいたつとデータが反映されたりするのですが、
すぐに連続して実行したいので使い物にならなかったです。
② google-cloud-bigqueryでテーブル作成・アップデート
ここに載っている内容でスプレッドシート情報をアップロードしたケースです。
https://googleapis.dev/python/bigquery/latest/usage/tables.html
table_id = f'{project_id}.{dateset}.{table_name}'
client = bigquery.Client(project = project_id)
# テーブルがあったら削除
client.delete_table(f'{table_id}_temp', not_found_ok=True)
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append(bigquery.SchemaField(column, dtype, mode="NULLABLE"))
schema
# [SchemaField('id', 'INT64', 'NULLABLE', None, ()),
# SchemaField('name', 'STRING', 'NULLABLE', None, ()),
# SchemaField('age', 'INT64', 'NULLABLE', None, ()),
# SchemaField('favorit', 'STRING', 'NULLABLE', None, ())]
table = bigquery.Table(f'{table_id}_temp', schema=schema)
client.create_table(table)
下記は①と同じコードなのでまとめています。
df = pd.DataFrame(values[3:], columns=values[1])
# 空白削除、空白処理の処理
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])
# tempのテーブルにアップデート
client.insert_rows(table, df.values.tolist())
# アップデート用SQLの作成
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1][1:]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
# SQLを実行してデータのアップデート
client.query(query)
これでも①と同様にperson_temp
のテーブルが作成される確率が低かったです。
DPとの連携時はこのコードを良く使っていたのですが、
その際はsleepを20秒ぐらい入れて動くようにしていました。
③ pandas-gbqでテーブル作成・アップデート
他のブログで邪教だと書いてあったものもあったのであまり期待していなかったですが、
やってみてよかったです。
# 使用するためにインストールが必要
!pip install pandas-gbq
df = pd.DataFrame(values[3:], columns=values[1])
# 空白削除、空白処理の処理
df = df.dropna(subset=[values[1][0]])
df = df.replace({'': None})
このスキーマの作成はname
とtype
の辞書型を作成します。
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append({'name': column, 'type': dtype})
schema
# [{'name': 'id', 'type': 'int64'},
# {'name': 'name', 'type': 'string'},
# {'name': 'age', 'type': 'int64'},
# {'name': 'favorit', 'type': 'string'}]
df.to_gbq(f'{dateset}.{table_name}_temp', project_id, if_exists='replace', table_schema=schema)
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
client.query(query)
こちらで問題なくアップデートができました。
ただし、下記のサイトを見るとpandas-gbq
からgoogle-cloud-bigquery
へ移行してくださいと書かれています。
https://cloud.google.com/bigquery/docs/pandas-gbq-migration?hl=ja
なので、次にgoogle-cloud-bigquery
を使った方法を記載しました。
④ google-cloud-bigqueryでテーブル作成・アップデート
# pyarrowをインストール
!pip install pyarrow
table_id = f'{project_id}.{dateset}.{table_name}'
df = pd.DataFrame(values[3:], columns=values[1])
# 空白削除、空白処理の処理
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append(bigquery.SchemaField(column,dtype))
if dtype != 'string':
df[column] = df[column].astype(dtype)
schema
google-cloud-bigqueryはpandas-gbqとは違って
テーブルとDataFrameのデータ型も一致していないとエラーになります。
公式の方が設定が細かいですね…。
df.dtpyesでデータ型が変わっているかの確認が可能です。
df.dtypes
# id int64
# name object
# age int64
# favorit object
# dtype: object
続いて、この後実行するload_table_from_dataframe()
で行う設定を行います。
config
は日本語で設定
という意味です。
下記で設定可能な属性(Attributes)一覧が載っています。
https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.LoadJobConfig.html
今回はそのうちschema
とwrite_disposition
で既存テーブルがあったときに上書きをする設定にします。
job_config = bigquery.LoadJobConfig(
schema = schema,
write_disposition = 'WRITE_TRUNCATE'
)
job = client.load_table_from_dataframe(
df, f'{table_id}_temp', job_config=job_config
)
# Wait for the load job to complete.
job.result()
下記は①と同じコードなのでまとめています。
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
client.query(query)
③の方がデータ型の指定が楽な分良い気がしますが、
④の方が動きが早いのかな?
あと気になっているのはデータ量が多くなった際に、
google-cloud-bigqueryでのinsert上限が1万レコードだったので
それ以上を行いたい場合の処理内容を各必要はあるのかもしれないです。
その場合は、1回目のみテーブルの差し替えで
2回目以降はisertされる処理にすれば問題なさそうです。