1
0

More than 3 years have passed since last update.

スプレッドシートからBigQueryに差分をアップデートする4つの方法

Last updated at Posted at 2020-11-10

やりたかったことはスプレッドシート情報をBigQueryに上げて差分データの更新を行うということです。

困っていたこと

元々下記のフローで処理を行っていました。

図7.png

しかしこれだと、新規テーブル作成後にすぐにデータを挿入(insert)してもデータが更新されないことが多々ありました。
SQLの実行を行うときはjob id のステータスで実行完了しているかは見えるものの、
時差がかなり大きかったり、sleepを入れるとinsertできることもありましたが、
できないこともあり安定した方法を探していました。

結論

上記のフローを下記フローに変えることで解決しました。

図11.png

テーブルの作成とデータのアップロードを別々で行っていたところを同時に行う処理に
変更したところ問題なく動きました。
聞けば当たり前ですが、この書き方をしらなかったので、いかにテーブル作成後に
「どうやって実行完了の把握をするか?」、「どうやって待機処理をするか?」という方法にばかり目がいっていました。

この記事で書いている内容

今回はスプレッドシートにで更新された情報の差分データのみを
反映させるという内容を書いています。

スプレッドシートのデータはこんな下記で用意しています。

図8.png

1行目に日本語名、2行目にBigQuery用のカラム目、3行目にデータ型、
4行目以降に実際のデータが入っているというデータ構造です。

本当はPKの設定とかも入れたかったのですが、
今回はそこは趣旨としていれていないので、A列のIDが一致すればアップデート
一致しなければ更新するという流れにしました。

BigQueryのテーブルは下記のようにアップデートされます。

図6.png

4つの実行方法

実践したのは下記の4つになります。

① SQLの実行でテーブル作成 ⇒ アップデート
② google-cloud-bigqueryでテーブル作成 ⇒ pandasからアップデート
③ pandas-gbqでテーブル作成・アップデート
④ google-cloud-bigqueryでテーブル作成・アップデート

①、②は元々考えていたフローでテーブル作成後にアップロードできないパターンです。
③と④はテーブル作成と同時にデータをアプロードしてうまくいったパターンです。

同じ悩みを抱えている方もいるかと思い、一応①、②も記載しました。

下記からコードの紹介です。

共通部分のコード

認証
from google.colab import auth
auth.authenticate_user()

上記実行すると、下記の認証コードがでてきます。
図4.png

別の記事で認証コードなしで実行する方法も紹介していますが、
異なるPJのテーブルのデータとマージさせたりする場合は、
認証コードを毎回入れた方がよいと思います。

参考:Google Colaboratoryで認証コードなしで実行する方法

スプレッドシート操作、BigQuery操作は別記事で書いているので詳細は省きます。
Pythonでのスプレッドシート操作方法(gspreadの使い方)
PythonでBigQueryを操作する方法

スプレッドシートの使用準備
!pip install gspread
from google.colab import auth
from oauth2client.client import GoogleCredentials
import gspread

# 認証処理
auth.authenticate_user()
gc = gspread.authorize(GoogleCredentials.get_application_default())

ss_id = `スプレッドシートID`
sht_name = 'シート名'

workbook = gc.open_by_key(ss_id)
worksheet = workbook.worksheet(sht_name)
values = worksheet.get_all_values()

values
# [['ID', '名前', '年齢', '好きなもの'],
#  ['id', 'name', 'age', 'favorit'],
#  ['int64', 'string', 'int64', 'string'],
#  ['1', 'Bob', '25', 'apple'],
#  ['2', 'Tom', '32', 'orange'],
#  ['3', 'Jay', '28', 'grape']]
BigQueryの使用準備
project_id = 'GCPのプロジェクトID'
dateset = 'データセット名'
table_name = 'テーブル名'

from google.cloud import bigquery
client = bigquery.Client(project=project_id)

ここまでは①~④で共有で使用する部分になります。

① SQLの実行でテーブル作成・アップデート

SQLの実行という表現が正しいのかわからないですが、
クエリエディタで操作するのと同じSQLで実行できる方法です。

メリットは書きやすいことjob idでステータスがわかることかなと思っています。

tempテーブル作成用のSQL作成
table_id = f'{project_id}.{dateset}.{table_name}'
query = f'create or replace table `{table_id}_temp`(\n'

for column, dtype in zip(values[1],values[2]):
  query += f'{column} {dtype},\n'
query = query[:-2] + ')'
query

# create or replace table `{project_id}.{dateset}.{table_name}_temp`(
# id int64,
# name string,
# age int64,
# favorit string)
SQLの実行
# 実行
result = client.query(query)

# ジョブの状態確認
client.get_job(result.job_id, location='asia-northeast1').state

# 'DONE'

ここまでのテーブルの作り方が①~③で違いますが、
以下の差分をアップデートするためのSQLは共通で使用します。

update用のSQL作成
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1][1:]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
query

# merge into `{project_id}.{dateset}.{table_name}` as T
# using `{project_id}.{dateset}.{table_name}_temp` as S
# on T.id = S.id
# when matched then
# update set
# name = S.name,
# age = S.age,
# favorit = S.favorit
# when not matched then
# insert(id, name, age, favorit)
# values(id, name, age, favorit)
update用のSQL実行
client.query(query)

すべて実行を終えて、BigQuery上でクエリ結果をみると
下記画像のように成功になっています。

図9.png

ただしテーブルを見てみると、下記のようにからな状態になっています。
jobのステータスはDONEなんだから反映されていてくれよ…。

図2.png

サーバーの問題か5分ぐらいたつとデータが反映されたりするのですが、
すぐに連続して実行したいので使い物にならなかったです。

② google-cloud-bigqueryでテーブル作成・アップデート

ここに載っている内容でスプレッドシート情報をアップロードしたケースです。
https://googleapis.dev/python/bigquery/latest/usage/tables.html

テーブル削除とスキーマの作成
table_id = f'{project_id}.{dateset}.{table_name}'
client = bigquery.Client(project = project_id)

# テーブルがあったら削除
client.delete_table(f'{table_id}_temp', not_found_ok=True)

schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append(bigquery.SchemaField(column, dtype, mode="NULLABLE"))
schema

# [SchemaField('id', 'INT64', 'NULLABLE', None, ()),
#  SchemaField('name', 'STRING', 'NULLABLE', None, ()),
#  SchemaField('age', 'INT64', 'NULLABLE', None, ()),
#  SchemaField('favorit', 'STRING', 'NULLABLE', None, ())]
tempテーブルの作成
table = bigquery.Table(f'{table_id}_temp', schema=schema)
client.create_table(table)

下記は①と同じコードなのでまとめています。

アップデート用のSQLを作成して実行
df = pd.DataFrame(values[3:], columns=values[1])
# 空白削除、空白処理の処理
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])

# tempのテーブルにアップデート
client.insert_rows(table, df.values.tolist())

# アップデート用SQLの作成
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1][1:]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

# SQLを実行してデータのアップデート
client.query(query)

これでも①と同様にperson_temp のテーブルが作成される確率が低かったです。
DPとの連携時はこのコードを良く使っていたのですが、
その際はsleepを20秒ぐらい入れて動くようにしていました。

③ pandas-gbqでテーブル作成・アップデート

他のブログで邪教だと書いてあったものもあったのであまり期待していなかったですが、
やってみてよかったです。

DataFrameの作成
# 使用するためにインストールが必要
!pip install pandas-gbq

df = pd.DataFrame(values[3:], columns=values[1])
# 空白削除、空白処理の処理
df = df.dropna(subset=[values[1][0]])
df = df.replace({'': None})

このスキーマの作成はnametypeの辞書型を作成します。

schemaの作成
schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append({'name': column, 'type': dtype})
schema

# [{'name': 'id', 'type': 'int64'},
#  {'name': 'name', 'type': 'string'},
#  {'name': 'age', 'type': 'int64'},
#  {'name': 'favorit', 'type': 'string'}]
一時テーブルへのアップデート
df.to_gbq(f'{dateset}.{table_name}_temp', project_id, if_exists='replace', table_schema=schema)
アップデート用のSQLを作成して実行
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

client.query(query)

こちらで問題なくアップデートができました。
ただし、下記のサイトを見るとpandas-gbqからgoogle-cloud-bigqueryへ移行してくださいと書かれています。
https://cloud.google.com/bigquery/docs/pandas-gbq-migration?hl=ja

なので、次にgoogle-cloud-bigqueryを使った方法を記載しました。

④ google-cloud-bigqueryでテーブル作成・アップデート

pyarrowをインストール
# pyarrowをインストール
!pip install pyarrow

table_id = f'{project_id}.{dateset}.{table_name}'

df = pd.DataFrame(values[3:], columns=values[1])
# 空白削除、空白処理の処理
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])
schemaの作成及びDataFrameのデータ型変更
schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append(bigquery.SchemaField(column,dtype))
  if dtype != 'string':
    df[column] = df[column].astype(dtype)
schema

google-cloud-bigqueryはpandas-gbqとは違って
テーブルとDataFrameのデータ型も一致していないとエラーになります。
公式の方が設定が細かいですね…。

df.dtpyesでデータ型が変わっているかの確認が可能です。

dtypes
df.dtypes

# id          int64
# name       object
# age         int64
# favorit    object
# dtype: object

続いて、この後実行するload_table_from_dataframe()で行う設定を行います。
configは日本語で設定という意味です。

下記で設定可能な属性(Attributes)一覧が載っています。
https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.LoadJobConfig.html

今回はそのうちschemawrite_dispositionで既存テーブルがあったときに上書きをする設定にします。

jonの設定
job_config = bigquery.LoadJobConfig(
  schema = schema,
  write_disposition = 'WRITE_TRUNCATE'
)
dfのデータをアップロード
job = client.load_table_from_dataframe(
  df, f'{table_id}_temp', job_config=job_config
)

# Wait for the load job to complete.
job.result()

下記は①と同じコードなのでまとめています。

アップデート用のSQLを作成して実行
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

client.query(query)

③の方がデータ型の指定が楽な分良い気がしますが、
④の方が動きが早いのかな?

あと気になっているのはデータ量が多くなった際に、
google-cloud-bigqueryでのinsert上限が1万レコードだったので
それ以上を行いたい場合の処理内容を各必要はあるのかもしれないです。

その場合は、1回目のみテーブルの差し替えで
2回目以降はisertされる処理にすれば問題なさそうです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0