csvを加工してBigQueryにAppendする方法のメモ。
残課題
BigQueryのテーブルで、DATE型の列があるときのpandas.DataFrameからの書き出しが上手くいかない。
まずはColaboratoryでテスト
Colaboratoryからはうまく動作することを確認
ColaboratoryでBigQueryとGSCへアクセスするための認証
from google.colab import auth
auth.authenticate_user()
project_id = 'xxx'
!gcloud config set project {project_id}
BigQueryテーブルの最新データを取得
from google.cloud import bigquery as bq
from google.cloud import storage
dataset = "aaa"
table = "bbb"
bq_client = bq.Client(project_id) #初期化
query = "SELECT max(dy_date) as max_date FROM `{0}.{1}.{2}`".format(project_id, dataset, table)
rows = bq_client.query(query).result()
for r in rows:
max_date = r['max_date']
max_date
GCSからcsvを読み込み、pandasで加工
import pandas as pd
import datetime as dt
bucket_name = 'yyy'
file_read = 'test.csv'
s_client = storage.Client(project_id) #初期化
s_bucket = s_client.get_bucket(bucket_name)
blob = storage.Blob(file_read, s_bucket)
data = blob.download_as_string()
df = pd.read_csv(BytesIO(data))
df['date'] = pd.to_datetime(df['date'], format='%Y%m%d') #'date'列を日付型に変更
df = df[df['date'].dt.date >max_date] #BigQueryテーブルの最新日付よりも後の日付のみに絞る
df['date'] = df['date'].dt.date #日付型に変更しておく(Object型になる)
pandas.DataframeをBigQueryのテーブルにAPPEND
bq_client = bq.Client(project=project_id) #初期化
table_ref = bq_client.dataset(dataset_name).table(table_name) #BigQueryのテーブルを指定
#BigQuery のJobConfigを作成
job_config = bq.job.LoadJobConfig()
job_config.write_disposition = bq.WriteDisposition.WRITE_APPEND
#BigQueryのテーブルにデータをAPPEND
load_job = bq_client.load_table_from_dataframe(
df
,table_ref
,job_config = job_config
,location = 'US'
,project = project_id
) # API request
#CloudFunctionsで実装
①BigQueryに書き出すためには、pyarrow, fastparquetが必要。pyarrowはバージョンの指定が必要。
main.pyの一部
import pyarrow
import fastparquet
reqirment.txt
google-cloud-storage==1.16.1
google-cloud-bigquery==1.15.0
pandas
pyarrow>=0.4
fastparquet
②BigQueryに書き出すためには、テーブルの列はすべてNULLABLEでないとならない。
③BigQueryのテーブルDATE型に指定しており、datetime.dateで型を変更したが、型の不一致でうまくいかなかった。
→一度GCSにcsvを出力し、csvからBigQueryへAPPENDする形に変更
DataframeをGCSのcsvに書き出す
s_client = storage.Client(project_id) #初期化
s_bucket = s_client.get_bucket(バケット名)
blob = storage.Blob(ファイル名.csv, s_bucket)
blob.upload_from_string(data=df.to_csv(sep=",", index=False), content_type='text/csv')
GCSからBigQueryへAPPEND
bq_client = bq.Client(project=project_id) #初期化
table_ref = bq_client.dataset(dataset_name).table(table_name)
#BigQuery のJobConfigを作成
job_config = bq.job.LoadJobConfig()
job_config.write_disposition = bq.WriteDisposition.WRITE_APPEND
job_config.skip_leading_rows = 1
job_config.source_format = bq.SourceFormat.CSV
job_config.allow_jagged_rows = True
job_config.ignore_unknown_values = True
#BigQueryのテーブルにデータをAPPEND
load_job = bq_client.load_table_from_uri(
uri_out # uri_out = 'gs://バケット名/ファイル名.csv
,table_ref
,job_config = job_config
,location = 'US'
,project = project_id
) # API request